Merge pull request #2308 from hashicorp/f-leader-task

Leader task
This commit is contained in:
Alex Dadgar 2017-02-14 11:03:32 -08:00 committed by GitHub
commit b6357f96c5
15 changed files with 179 additions and 29 deletions

View File

@ -163,6 +163,7 @@ type Task struct {
Vault *Vault
Templates []*Template
DispatchPayload *DispatchPayloadConfig
Leader *bool
}
// TaskArtifact is used to download artifacts before running a task.
@ -256,10 +257,10 @@ const (
TaskNotRestarting = "Not Restarting"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@ -383,19 +383,39 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
if state == structs.TaskStateDead {
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
var destroyingTasks []string
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*TaskRunner
var otherTaskNames []string
leader := false
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, task)
} else if tr.task.Leader {
leader = true
}
}
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
} else if leader {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
}
// If the task was a leader task we should kill all the other
// tasks.
}
select {

View File

@ -663,6 +663,70 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
})
}
func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
upd, ar := testAllocRunner(false)
// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}
task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "1s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}
// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if len(state1.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}
found := false
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
found = true
}
}
if !found {
return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
}
// Task Two should be dead
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()

View File

@ -349,12 +349,6 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = "Task exceeded restart policy"
}
case api.TaskVaultRenewalFailed:
if event.VaultError != "" {
desc = event.VaultError
} else {
desc = "Task's Vault token failed to be renewed"
}
case api.TaskSiblingFailed:
if event.FailedSibling != "" {
desc = fmt.Sprintf("Task's sibling %q failed", event.FailedSibling)
@ -382,6 +376,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
}
// Reverse order so we are sorted by time

View File

@ -568,6 +568,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"driver",
"env",
"kill_timeout",
"leader",
"logs",
"meta",
"resources",

View File

@ -181,6 +181,7 @@ func TestParse(t *testing.T) {
Perms: "777",
},
},
Leader: true,
},
&structs.Task{
Name: "storagelocker",

View File

@ -50,6 +50,7 @@ job "binstore-storagelocker" {
task "binstore" {
driver = "docker"
user = "bob"
leader = true
config {
image = "hashicorp/binstore"

View File

@ -403,7 +403,7 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, vDiff)
}
// Artifacts diff
// Template diff
tmplDiffs := primitiveObjectSetDiff(
interfaceSlice(t.Templates),
interfaceSlice(other.Templates),

View File

@ -1777,6 +1777,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "0",
},
{
Type: DiffTypeAdded,
Name: "Leader",
Old: "",
New: "false",
},
},
},
{
@ -1811,6 +1817,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "0",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Leader",
Old: "false",
New: "",
},
},
},
},
@ -1880,6 +1892,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
@ -1892,6 +1905,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
Expected: &TaskDiff{
Type: DiffTypeNone,
@ -1911,6 +1925,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
@ -1923,6 +1938,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "baz",
},
KillTimeout: 2 * time.Second,
Leader: false,
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
@ -1946,6 +1962,12 @@ func TestTaskDiff(t *testing.T) {
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeEdited,
Name: "Leader",
Old: "true",
New: "false",
},
{
Type: DiffTypeEdited,
Name: "Meta[foo]",

View File

@ -1937,8 +1937,9 @@ func (tg *TaskGroup) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have an ephemeral disk object", tg.Name))
}
// Check for duplicate tasks
// Check for duplicate tasks and that there is only leader task if any
tasks := make(map[string]int)
leaderTasks := 0
for idx, task := range tg.Tasks {
if task.Name == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d missing name", idx+1))
@ -1947,6 +1948,14 @@ func (tg *TaskGroup) Validate() error {
} else {
tasks[task.Name] = idx
}
if task.Leader {
leaderTasks++
}
}
if leaderTasks > 1 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}
// Validate the tasks
@ -2295,6 +2304,10 @@ type Task struct {
// Artifacts is a list of artifacts to download and extract before running
// the task.
Artifacts []*TaskArtifact
// Leader marks the task as the leader within the group. When the leader
// task exits, other tasks will be gracefully terminated.
Leader bool
}
func (t *Task) Copy() *Task {
@ -2781,12 +2794,15 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"
// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@ -419,8 +419,8 @@ func TestTaskGroup_Validate(t *testing.T) {
Name: "web",
Count: 1,
Tasks: []*Task{
&Task{Name: "web"},
&Task{Name: "web"},
&Task{Name: "web", Leader: true},
&Task{Name: "web", Leader: true},
&Task{},
},
RestartPolicy: &RestartPolicy{
@ -442,7 +442,10 @@ func TestTaskGroup_Validate(t *testing.T) {
if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") {
if !strings.Contains(mErr.Errors[3].Error(), "Only one task may be marked as leader") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[4].Error(), "Task web validation failed") {
t.Fatalf("err: %s", err)
}
}

View File

@ -267,6 +267,8 @@ be specified using the `?region=` query parameter.
* `Failed Artifact Download` - Artifact(s) specified in the task failed to download.
* `Restart Signaled` - The task was signalled to be restarted.
* `Signaling` - The task was is being sent a signal.
* `Sibling task failed` - A task in the same task group failed.
* `Sibling Task Failed` - A task in the same task group failed.
* `Leader Task Dead` - The group's leader task is dead.
* `Driver` - A message from the driver.
Depending on the type the event will have applicable annotations.

View File

@ -358,6 +358,10 @@ The `Task` object supports the following keys:
sends `SIGTERM` if the task doesn't die after the `KillTimeout` duration has
elapsed. The default `KillTimeout` is 5 seconds.
* `leader` - Specifies whether the task is the leader task of the task group. If
set to true, when the leader task completes, all other tasks within the task
group will be gracefully shutdown.
* `LogConfig` - This allows configuring log rotation for the `stdout` and `stderr`
buffers of a Task. See the log rotation reference below for more details.
@ -681,6 +685,9 @@ README][ct].
"SIGUSR1" or "SIGINT". This option is required if the `ChangeMode` is
`signal`.
* `perms` - Specifies the rendered template's permissions. File permissions are
given as octal of the unix file permissions rwxrwxrwx.
* `Splay` - Specifies a random amount of time to wait between 0ms and the given
splay value before invoking the change mode. Should be specified in
nanoseconds.

View File

@ -52,6 +52,10 @@ job "docs" {
If the task does not exit before the configured timeout, `SIGKILL` is sent to
the task.
- `leader` `(bool: false)` - Specifies whether the task is the leader task of
the task group. If set to true, when the leader task completes, all other
tasks within the task group will be gracefully shutdown.
- `logs` <code>([Logs][]: nil)</code> - Specifies logging configuration for the
`stdout` and `stderr` of the task.

View File

@ -79,9 +79,9 @@ $ nomad logs -stderr 04d9627d
While the logs command works well for quickly accessing application logs, it
generally does not scale to large systems or systems that produce a lot of log
output, especially for the long-term storage of logs. Nomad only retains log
files for a configurable period of time, so chatty applications should use a
better log retention strategy.
output, especially for the long-term storage of logs. Nomad's retention of log
files is best effort, so chatty applications should use a better log retention
strategy.
Since applications log to the `alloc/` directory, all tasks within the same task
group have access to each others logs. Thus it is possible to have a task group
@ -91,6 +91,10 @@ as follows:
group "my-group" {
task "server" {
# ...
# Setting the server task as the leader of the task group allows us to
# signal the log shipper task to gracefully shutdown when the server exits.
leader = true
}
task "log-shipper" {
@ -103,3 +107,11 @@ In the above example, the `server` task is the application that should be run
and will be producing the logs. The `log-shipper` reads those logs from the
`alloc/logs/` directory and sends them to a longer-term storage solution such as
Amazon S3 or an internal log aggregation system.
When using the log shipper pattern, especially for batch jobs, the main task
should be marked as the [leader task](/docs/job-specification/task.html#leader).
By marking the main task as a leader, when the task completes all other tasks
within the group will be gracefully shutdown. This allows the log shipper to
finish sending any logs and then exiting itself. The log shipper should set a
high enough [`kill_timeout`](/docs/job-specification/task.html#kill_timeout)
such that it can ship any remaining logs before exiting.