From 31953f55e4d857f21cc8f731ccdaa975a45d5942 Mon Sep 17 00:00:00 2001 From: Halil Kaskavalci Date: Thu, 17 Mar 2016 15:47:46 +0200 Subject: [PATCH 01/13] Update confusing terms in scheduling.html.md I think it creates confusion for new learners to call clients as nodes. A node may not have Nomad agent running and jobs can only be scheduled to clients. --- website/source/docs/internals/scheduling.html.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/source/docs/internals/scheduling.html.md b/website/source/docs/internals/scheduling.html.md index 328502c6c..18b467dd5 100644 --- a/website/source/docs/internals/scheduling.html.md +++ b/website/source/docs/internals/scheduling.html.md @@ -28,8 +28,8 @@ spelunking through the source code. There are four primary "nouns" in Nomad; jobs, nodes, allocations, and evaluations. Jobs are submitted by users and represent a _desired state_. A job is a declarative description -of tasks to run which are bounded by constraints and require resources. Nodes are the servers -in the clusters that tasks can be scheduled on. The mapping of tasks in a job to nodes is done +of tasks to run which are bounded by constraints and require resources. Clients are nodes in the cluster +with Nomad agent that tasks can be scheduled on. The mapping of tasks in a job to clients is done using allocations. An allocation is used to declare that a set of tasks in a job should be run on a particular node. Scheduling is the process of determining the appropriate allocations and is done as part of an evaluation. From 7ed107d04ebf51d65811a273f52b393cb6fdbf1d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 17 Mar 2016 16:48:45 -0700 Subject: [PATCH 02/13] Always defaulting to prefix match --- command/alloc_status.go | 81 ++++++++++++++++++-------------------- command/fs_cat.go | 81 ++++++++++++++++++-------------------- command/fs_ls.go | 81 ++++++++++++++++++-------------------- command/fs_stat.go | 81 ++++++++++++++++++-------------------- command/node_drain.go | 87 ++++++++++++++++++++--------------------- command/node_status.go | 87 ++++++++++++++++++++--------------------- command/status.go | 53 ++++++++++++------------- 7 files changed, 265 insertions(+), 286 deletions(-) diff --git a/command/alloc_status.go b/command/alloc_status.go index 49e3844fa..5e6807e49 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -75,50 +75,47 @@ func (c *AllocStatusCommand) Run(args []string) int { } // Query the allocation info - alloc, _, err := client.Allocations().Info(allocID, nil) - if err != nil { - if len(allocID) == 1 { - c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) - return 1 - } - if len(allocID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - allocID = allocID[:len(allocID)-1] - } + if len(allocID) == 1 { + c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } - allocs, _, err := client.Allocations().PrefixList(allocID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) - return 1 - } - if len(allocs) == 0 { - c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) - return 1 - } - if len(allocs) > 1 { - // Format the allocs - out := make([]string, len(allocs)+1) - out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" - for i, alloc := range allocs { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", - limit(alloc.ID, length), - limit(alloc.EvalID, length), - alloc.JobID, - alloc.TaskGroup, - alloc.DesiredStatus, - alloc.ClientStatus, - ) - } - c.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single allocation - alloc, _, err = client.Allocations().Info(allocs[0].ID, nil) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) - return 1 + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) } + c.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 } // Format the allocation data diff --git a/command/fs_cat.go b/command/fs_cat.go index 9a7616ab0..2322846d8 100644 --- a/command/fs_cat.go +++ b/command/fs_cat.go @@ -68,50 +68,47 @@ func (f *FSCatCommand) Run(args []string) int { length = fullId } // Query the allocation info - alloc, _, err := client.Allocations().Info(allocID, nil) - if err != nil { - if len(allocID) == 1 { - f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) - return 1 - } - if len(allocID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - allocID = allocID[:len(allocID)-1] - } + if len(allocID) == 1 { + f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } - allocs, _, err := client.Allocations().PrefixList(allocID) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) - return 1 - } - if len(allocs) == 0 { - f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) - return 1 - } - if len(allocs) > 1 { - // Format the allocs - out := make([]string, len(allocs)+1) - out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" - for i, alloc := range allocs { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", - limit(alloc.ID, length), - limit(alloc.EvalID, length), - alloc.JobID, - alloc.TaskGroup, - alloc.DesiredStatus, - alloc.ClientStatus, - ) - } - f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single allocation - alloc, _, err = client.Allocations().Info(allocs[0].ID, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) - return 1 + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) } + f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 } if alloc.DesiredStatus == "failed" { diff --git a/command/fs_ls.go b/command/fs_ls.go index 9329f1dd2..a15373d95 100644 --- a/command/fs_ls.go +++ b/command/fs_ls.go @@ -74,50 +74,47 @@ func (f *FSListCommand) Run(args []string) int { length = fullId } // Query the allocation info - alloc, _, err := client.Allocations().Info(allocID, nil) - if err != nil { - if len(allocID) == 1 { - f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) - return 1 - } - if len(allocID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - allocID = allocID[:len(allocID)-1] - } + if len(allocID) == 1 { + f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } - allocs, _, err := client.Allocations().PrefixList(allocID) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) - return 1 - } - if len(allocs) == 0 { - f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) - return 1 - } - if len(allocs) > 1 { - // Format the allocs - out := make([]string, len(allocs)+1) - out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" - for i, alloc := range allocs { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", - limit(alloc.ID, length), - limit(alloc.EvalID, length), - alloc.JobID, - alloc.TaskGroup, - alloc.DesiredStatus, - alloc.ClientStatus, - ) - } - f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single allocation - alloc, _, err = client.Allocations().Info(allocs[0].ID, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) - return 1 + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) } + f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 } if alloc.DesiredStatus == "failed" { diff --git a/command/fs_stat.go b/command/fs_stat.go index 9c163f803..4f20a4a47 100644 --- a/command/fs_stat.go +++ b/command/fs_stat.go @@ -73,50 +73,47 @@ func (f *FSStatCommand) Run(args []string) int { length = fullId } // Query the allocation info - alloc, _, err := client.Allocations().Info(allocID, nil) - if err != nil { - if len(allocID) == 1 { - f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) - return 1 - } - if len(allocID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - allocID = allocID[:len(allocID)-1] - } + if len(allocID) == 1 { + f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } - allocs, _, err := client.Allocations().PrefixList(allocID) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) - return 1 - } - if len(allocs) == 0 { - f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) - return 1 - } - if len(allocs) > 1 { - // Format the allocs - out := make([]string, len(allocs)+1) - out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" - for i, alloc := range allocs { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", - limit(alloc.ID, length), - limit(alloc.EvalID, length), - alloc.JobID, - alloc.TaskGroup, - alloc.DesiredStatus, - alloc.ClientStatus, - ) - } - f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single allocation - alloc, _, err = client.Allocations().Info(allocs[0].ID, nil) - if err != nil { - f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) - return 1 + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) } + f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 } if alloc.DesiredStatus == "failed" { diff --git a/command/node_drain.go b/command/node_drain.go index 474c6908e..8b73ed344 100644 --- a/command/node_drain.go +++ b/command/node_drain.go @@ -69,53 +69,50 @@ func (c *NodeDrainCommand) Run(args []string) int { } // Check if node exists - node, _, err := client.Nodes().Info(nodeID, nil) - if err != nil { - if len(nodeID) == 1 { - c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) - return 1 - } - if len(nodeID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - nodeID = nodeID[:len(nodeID)-1] - } + if len(nodeID) == 1 { + c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) + return 1 + } + if len(nodeID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + nodeID = nodeID[:len(nodeID)-1] + } - // Exact lookup failed, try with prefix based search - nodes, _, err := client.Nodes().PrefixList(nodeID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err)) - return 1 - } - // Return error if no nodes are found - if len(nodes) == 0 { - c.Ui.Error(fmt.Sprintf("No node(s) with prefix or id %q found", nodeID)) - return 1 - } - if len(nodes) > 1 { - // Format the nodes list that matches the prefix so that the user - // can create a more specific request - out := make([]string, len(nodes)+1) - out[0] = "ID|Datacenter|Name|Class|Drain|Status" - for i, node := range nodes { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%v|%s", - node.ID, - node.Datacenter, - node.Name, - node.NodeClass, - node.Drain, - node.Status) - } - // Dump the output - c.Ui.Output(fmt.Sprintf("Prefix matched multiple nodes\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single node - node, _, err = client.Nodes().Info(nodes[0].ID, nil) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err)) - return 1 + // Exact lookup failed, try with prefix based search + nodes, _, err := client.Nodes().PrefixList(nodeID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err)) + return 1 + } + // Return error if no nodes are found + if len(nodes) == 0 { + c.Ui.Error(fmt.Sprintf("No node(s) with prefix or id %q found", nodeID)) + return 1 + } + if len(nodes) > 1 { + // Format the nodes list that matches the prefix so that the user + // can create a more specific request + out := make([]string, len(nodes)+1) + out[0] = "ID|Datacenter|Name|Class|Drain|Status" + for i, node := range nodes { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%v|%s", + node.ID, + node.Datacenter, + node.Name, + node.NodeClass, + node.Drain, + node.Status) } + // Dump the output + c.Ui.Output(fmt.Sprintf("Prefix matched multiple nodes\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single node + node, _, err := client.Nodes().Info(nodes[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error toggling drain mode: %s", err)) + return 1 } // Toggle node draining diff --git a/command/node_status.go b/command/node_status.go index 164db8678..efbbab312 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -133,53 +133,50 @@ func (c *NodeStatusCommand) Run(args []string) int { // Query the specific node nodeID := args[0] - node, _, err := client.Nodes().Info(nodeID, nil) - if err != nil { - if len(nodeID) == 1 { - c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) - return 1 - } - if len(nodeID)%2 == 1 { - // Identifiers must be of even length, so we strip off the last byte - // to provide a consistent user experience. - nodeID = nodeID[:len(nodeID)-1] - } + if len(nodeID) == 1 { + c.Ui.Error(fmt.Sprintf("Identifier must contain at least two characters.")) + return 1 + } + if len(nodeID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + nodeID = nodeID[:len(nodeID)-1] + } - // Exact lookup failed, try with prefix based search - nodes, _, err := client.Nodes().PrefixList(nodeID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err)) - return 1 - } - // Return error if no nodes are found - if len(nodes) == 0 { - c.Ui.Error(fmt.Sprintf("No node(s) with prefix %q found", nodeID)) - return 1 - } - if len(nodes) > 1 { - // Format the nodes list that matches the prefix so that the user - // can create a more specific request - out := make([]string, len(nodes)+1) - out[0] = "ID|DC|Name|Class|Drain|Status" - for i, node := range nodes { - out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%v|%s", - limit(node.ID, length), - node.Datacenter, - node.Name, - node.NodeClass, - node.Drain, - node.Status) - } - // Dump the output - c.Ui.Output(fmt.Sprintf("Prefix matched multiple nodes\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single node - node, _, err = client.Nodes().Info(nodes[0].ID, nil) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err)) - return 1 + // Exact lookup failed, try with prefix based search + nodes, _, err := client.Nodes().PrefixList(nodeID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err)) + return 1 + } + // Return error if no nodes are found + if len(nodes) == 0 { + c.Ui.Error(fmt.Sprintf("No node(s) with prefix %q found", nodeID)) + return 1 + } + if len(nodes) > 1 { + // Format the nodes list that matches the prefix so that the user + // can create a more specific request + out := make([]string, len(nodes)+1) + out[0] = "ID|DC|Name|Class|Drain|Status" + for i, node := range nodes { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%v|%s", + limit(node.ID, length), + node.Datacenter, + node.Name, + node.NodeClass, + node.Drain, + node.Status) } + // Dump the output + c.Ui.Output(fmt.Sprintf("Prefix matched multiple nodes\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single node + node, _, err := client.Nodes().Info(nodes[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying node info: %s", err)) + return 1 } m := node.Attributes diff --git a/command/status.go b/command/status.go index d5d127d35..d7bcdde4a 100644 --- a/command/status.go +++ b/command/status.go @@ -105,36 +105,33 @@ func (c *StatusCommand) Run(args []string) int { // Try querying the job jobID := args[0] - job, _, err := client.Jobs().Info(jobID, nil) + jobs, _, err := client.Jobs().PrefixList(jobID) if err != nil { - jobs, _, err := client.Jobs().PrefixList(jobID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying job: %s", err)) - return 1 - } - if len(jobs) == 0 { - c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID)) - return 1 - } - if len(jobs) > 1 { - out := make([]string, len(jobs)+1) - out[0] = "ID|Type|Priority|Status" - for i, job := range jobs { - out[i+1] = fmt.Sprintf("%s|%s|%d|%s", - job.ID, - job.Type, - job.Priority, - job.Status) - } - c.Ui.Output(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", formatList(out))) - return 0 - } - // Prefix lookup matched a single job - job, _, err = client.Jobs().Info(jobs[0].ID, nil) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error querying job: %s", err)) - return 1 + c.Ui.Error(fmt.Sprintf("Error querying job: %s", err)) + return 1 + } + if len(jobs) == 0 { + c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID)) + return 1 + } + if len(jobs) > 1 { + out := make([]string, len(jobs)+1) + out[0] = "ID|Type|Priority|Status" + for i, job := range jobs { + out[i+1] = fmt.Sprintf("%s|%s|%d|%s", + job.ID, + job.Type, + job.Priority, + job.Status) } + c.Ui.Output(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single job + job, _, err := client.Jobs().Info(jobs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying job: %s", err)) + return 1 } // Check if it is periodic From 9f91590bbd8e0d6a424321ce04bab41521ed2ced Mon Sep 17 00:00:00 2001 From: Halil Kaskavalci Date: Fri, 18 Mar 2016 08:19:28 +0200 Subject: [PATCH 03/13] Rephrase the sentence --- website/source/docs/internals/scheduling.html.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/source/docs/internals/scheduling.html.md b/website/source/docs/internals/scheduling.html.md index 18b467dd5..2399e9d00 100644 --- a/website/source/docs/internals/scheduling.html.md +++ b/website/source/docs/internals/scheduling.html.md @@ -28,8 +28,8 @@ spelunking through the source code. There are four primary "nouns" in Nomad; jobs, nodes, allocations, and evaluations. Jobs are submitted by users and represent a _desired state_. A job is a declarative description -of tasks to run which are bounded by constraints and require resources. Clients are nodes in the cluster -with Nomad agent that tasks can be scheduled on. The mapping of tasks in a job to clients is done +of tasks to run which are bounded by constraints and require resources. Tasks can be scheduled on +nodes in the cluster runnaing the Nomad client. The mapping of tasks in a job to clients is done using allocations. An allocation is used to declare that a set of tasks in a job should be run on a particular node. Scheduling is the process of determining the appropriate allocations and is done as part of an evaluation. From 05fbf7292ca7b177e7404ac0cd42e2ec43022748 Mon Sep 17 00:00:00 2001 From: Halil Kaskavalci Date: Fri, 18 Mar 2016 08:25:23 +0200 Subject: [PATCH 04/13] Typo Copy & paste isn't the best practice --- website/source/docs/internals/scheduling.html.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/source/docs/internals/scheduling.html.md b/website/source/docs/internals/scheduling.html.md index 2399e9d00..f09f8634f 100644 --- a/website/source/docs/internals/scheduling.html.md +++ b/website/source/docs/internals/scheduling.html.md @@ -29,7 +29,7 @@ spelunking through the source code. There are four primary "nouns" in Nomad; jobs, nodes, allocations, and evaluations. Jobs are submitted by users and represent a _desired state_. A job is a declarative description of tasks to run which are bounded by constraints and require resources. Tasks can be scheduled on -nodes in the cluster runnaing the Nomad client. The mapping of tasks in a job to clients is done +nodes in the cluster running the Nomad client. The mapping of tasks in a job to clients is done using allocations. An allocation is used to declare that a set of tasks in a job should be run on a particular node. Scheduling is the process of determining the appropriate allocations and is done as part of an evaluation. From 3c7b83b39366e0bcb82aeed3c35062fb0b8c803f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 17 Mar 2016 02:53:31 -0700 Subject: [PATCH 05/13] Introduced a method in executor to launch syslog server --- client/driver/docker.go | 28 +-- client/driver/docker_test.go | 4 +- client/driver/exec.go | 21 +- client/driver/executor/executor.go | 230 ++++++++++++------ client/driver/executor/executor_linux.go | 18 +- client/driver/executor/executor_posix.go | 44 ++++ client/driver/executor/executor_test.go | 47 ++-- client/driver/executor/executor_windows.go | 5 + client/driver/executor_plugin.go | 36 +++ client/driver/java.go | 21 +- .../driver/logging/syslog_server_windows.go | 10 + client/driver/qemu.go | 10 +- client/driver/raw_exec.go | 10 +- client/driver/rkt.go | 11 +- client/driver/structs/structs.go | 4 + 15 files changed, 348 insertions(+), 151 deletions(-) create mode 100644 client/driver/executor/executor_posix.go create mode 100644 client/driver/executor/executor_windows.go create mode 100644 client/driver/logging/syslog_server_windows.go diff --git a/client/driver/docker.go b/client/driver/docker.go index 7c04daacb..5829b7ad5 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -20,7 +20,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/driver/logging" + "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" @@ -101,7 +101,7 @@ type dockerPID struct { type DockerHandle struct { pluginClient *plugin.Client - logCollector logging.LogCollector + executor executor.Executor client *docker.Client logger *log.Logger cleanupContainer bool @@ -533,23 +533,23 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name)) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) pluginConfig := &plugin.ClientConfig{ - Cmd: exec.Command(bin, "syslog", pluginLogFile), + Cmd: exec.Command(bin, "executor", pluginLogFile), } - logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } - logCollectorCtx := &logging.LogCollectorContext{ - TaskName: task.Name, + executorCtx := &executor.ExecutorContext{ + TaskEnv: d.taskEnv, + Task: task, AllocDir: ctx.AllocDir, - LogConfig: task.LogConfig, PortLowerBound: d.config.ClientMinPort, PortUpperBound: d.config.ClientMaxPort, } - ss, err := logCollector.LaunchCollector(logCollectorCtx) + ss, err := exec.LaunchSyslogServer(executorCtx) if err != nil { return nil, fmt.Errorf("failed to start syslog collector: %v", err) } @@ -629,7 +629,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle maxKill := d.DriverContext.config.MaxKillTimeout h := &DockerHandle{ client: client, - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, @@ -686,7 +686,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er if !found { return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err) } - logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err) if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil { @@ -698,7 +698,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er // Return a driver handle h := &DockerHandle{ client: client, - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, @@ -743,7 +743,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { + if err := h.executor.UpdateTask(task); err != nil { h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) } @@ -824,7 +824,7 @@ func (h *DockerHandle) run() { close(h.waitCh) // Shutdown the syslog collector - if err := h.logCollector.Exit(); err != nil { + if err := h.executor.Exit(); err != nil { h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err) } h.pluginClient.Kill() diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 9e641c219..0410855d2 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -145,7 +145,7 @@ func TestDockerDriver_Handle(t *testing.T) { pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "syslog", f.Name()), } - logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{}) + exec, pluginClient, err := createExecutor(pluginConfig, os.Stdout, &config.Config{}) if err != nil { t.Fatalf("got an err: %v", err) } @@ -154,7 +154,7 @@ func TestDockerDriver_Handle(t *testing.T) { h := &DockerHandle{ version: "version", imageID: "imageid", - logCollector: logCollector, + executor: exec, pluginClient: pluginClient, containerID: "containerid", killTimeout: 5 * time.Nanosecond, diff --git a/client/driver/exec.go b/client/driver/exec.go index c832ab9de..c3e99a3e8 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -100,16 +100,17 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, - ResourceLimits: true, - FSIsolation: true, - UnprivilegedUser: true, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } - ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) + ps, err := exec.LaunchCmd(&executor.ExecCommand{ + Cmd: command, + Args: driverConfig.Args, + FSIsolation: true, + ResourceLimits: true, + User: cstructs.DefaultUnpriviledgedUser, + }, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -217,7 +218,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { func (h *execHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 3c0596bdb..9ace1cb72 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,7 +2,9 @@ package executor import ( "fmt" + "io/ioutil" "log" + "net" "os" "os/exec" "runtime" @@ -21,6 +23,18 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Executor is the interface which allows a driver to launch and supervise +// a process +type Executor interface { + LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) + LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) + Wait() (*ProcessState, error) + ShutDown() error + Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error + UpdateTask(task *structs.Task) error +} + // ExecutorContext holds context to configure the command user // wants to run and isolate it type ExecutorContext struct { @@ -31,33 +45,36 @@ type ExecutorContext struct { // the task AllocDir *allocdir.AllocDir - // TaskName is the name of the Task - TaskName string + // Task is the task whose executor is being launched + Task *structs.Task - // TaskResources are the resource constraints for the Task - TaskResources *structs.Resources + // PortUpperBound is the upper bound of the ports that we can use to start + // the syslog server + PortUpperBound uint - // FSIsolation is a flag for drivers to impose file system - // isolation on certain platforms - FSIsolation bool - - // ResourceLimits is a flag for drivers to impose resource - // contraints on a Task on certain platforms - ResourceLimits bool - - // UnprivilegedUser is a flag for drivers to make the process - // run as nobody - UnprivilegedUser bool - - // LogConfig provides the configuration related to log rotation - LogConfig *structs.LogConfig + // PortLowerBound is the lower bound of the ports that we can use to start + // the syslog server + PortLowerBound uint } -// ExecCommand holds the user command and args. It's a lightweight replacement -// of exec.Cmd for serialization purposes. +// ExecCommand holds the user command, args, and other isolation related +// settings. type ExecCommand struct { - Cmd string + // Cmd is the command that the user wants to run. + Cmd string + + // Args is the args of the command that the user wants to run. Args []string + + // FSIsolation determines whether the command would be run in a chroot. + FSIsolation bool + + // User is the user which the executor uses to run the command. + User string + + // ResourceLimits determines whether resource limits are enforced by the + // executor. + ResourceLimits bool } // ProcessState holds information about the state of a user process. @@ -69,37 +86,44 @@ type ProcessState struct { Time time.Time } -// Executor is the interface which allows a driver to launch and supervise -// a process -type Executor interface { - LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) - Wait() (*ProcessState, error) - ShutDown() error - Exit() error - UpdateLogConfig(logConfig *structs.LogConfig) error +// SyslogServerState holds the address and islation information of a launched +// syslog server +type SyslogServerState struct { + IsolationConfig *cstructs.IsolationConfig + Addr string } // UniversalExecutor is an implementation of the Executor which launches and // supervises processes. In addition to process supervision it provides resource // and file system isolation type UniversalExecutor struct { - cmd exec.Cmd - ctx *ExecutorContext + cmd exec.Cmd + ctx *ExecutorContext + command *ExecCommand taskDir string - groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} - lre *logging.FileRotator - lro *logging.FileRotator + + lre *logging.FileRotator + lro *logging.FileRotator + rotatorLock sync.Mutex + + syslogServer *logging.SyslogServer + syslogChan chan *logging.SyslogMessage + + groups *cgroupConfig.Cgroup + cgLock sync.Mutex logger *log.Logger - lock sync.Mutex } // NewExecutor returns an Executor func NewExecutor(logger *log.Logger) Executor { - return &UniversalExecutor{logger: logger, processExited: make(chan interface{})} + return &UniversalExecutor{ + logger: logger, + processExited: make(chan interface{}), + } } // LaunchCmd launches a process and returns it's state. It also configures an @@ -108,6 +132,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, " ")) e.ctx = ctx + e.command = command // configuring the task dir if err := e.configureTaskDir(); err != nil { @@ -121,29 +146,18 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } // setting the user of the process - if e.ctx.UnprivilegedUser { - if err := e.runAs("nobody"); err != nil { + if command.User != "" { + if err := e.runAs(command.User); err != nil { return nil, err } } - logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - lro, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, e.logger) - - if err != nil { - return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) + // Setup the loggers + if err := e.configureLoggers(); err != nil { + return nil, err } - e.cmd.Stdout = lro - e.lro = lro - - lre, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", ctx.TaskName), - ctx.LogConfig.MaxFiles, logFileSize, e.logger) - if err != nil { - return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) - } - e.cmd.Stderr = lre - e.lre = lre + e.cmd.Stdout = e.lro + e.cmd.Stderr = e.lre // setting the env, path and args for the command e.ctx.TaskEnv.Build() @@ -165,6 +179,32 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } +// configureLoggers sets up the standard out/error file rotators +func (e *UniversalExecutor) configureLoggers() error { + e.rotatorLock.Lock() + defer e.rotatorLock.Unlock() + + logFileSize := int64(e.ctx.Task.LogConfig.MaxFileSizeMB * 1024 * 1024) + if e.lro == nil { + lro, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", e.ctx.Task.Name), + e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger) + if err != nil { + return err + } + e.lro = lro + } + + if e.lre == nil { + lre, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", e.ctx.Task.Name), + e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger) + if err != nil { + return err + } + e.lre = lre + } + return nil +} + // Wait waits until a process has exited and returns it's exitcode and errors func (e *UniversalExecutor) Wait() (*ProcessState, error) { <-e.processExited @@ -173,7 +213,7 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { // UpdateLogConfig updates the log configuration func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { - e.ctx.LogConfig = logConfig + e.ctx.Task.LogConfig = logConfig if e.lro == nil { return fmt.Errorf("log rotator for stdout doesn't exist") } @@ -188,9 +228,22 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error return nil } +func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { + e.ctx.Task = task + fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024) + e.lro.MaxFiles = task.LogConfig.MaxFiles + e.lro.FileSize = fileSize + e.lre.MaxFiles = task.LogConfig.MaxFiles + e.lre.FileSize = fileSize + return nil +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() + if e.syslogServer != nil { + e.syslogServer.Shutdown() + } e.lre.Close() e.lro.Close() if err == nil { @@ -203,14 +256,6 @@ func (e *UniversalExecutor) wait() { exitCode = status.ExitStatus() } } - if e.ctx.FSIsolation { - e.removeChrootMounts() - } - if e.ctx.ResourceLimits { - e.lock.Lock() - DestroyCgroup(e.groups) - e.lock.Unlock() - } e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()} } @@ -224,7 +269,7 @@ var ( // process func (e *UniversalExecutor) Exit() error { var merr multierror.Error - if e.cmd.Process != nil { + if e.command != nil && e.cmd.Process != nil { proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { e.logger.Printf("[ERR] executor: can't find process with pid: %v, err: %v", @@ -235,17 +280,17 @@ func (e *UniversalExecutor) Exit() error { } } - if e.ctx.FSIsolation { + if e.command != nil && e.command.FSIsolation { if err := e.removeChrootMounts(); err != nil { merr.Errors = append(merr.Errors, err) } } - if e.ctx.ResourceLimits { - e.lock.Lock() + if e.command != nil && e.command.ResourceLimits { + e.cgLock.Lock() if err := DestroyCgroup(e.groups); err != nil { merr.Errors = append(merr.Errors, err) } - e.lock.Unlock() + e.cgLock.Unlock() } return merr.ErrorOrNil() } @@ -270,10 +315,10 @@ func (e *UniversalExecutor) ShutDown() error { // configureTaskDir sets the task dir in the executor func (e *UniversalExecutor) configureTaskDir() error { - taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName] + taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name] e.taskDir = taskDir if !ok { - return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName) + return fmt.Errorf("couldn't find task directory for task %v", e.ctx.Task.Name) } e.cmd.Dir = taskDir return nil @@ -299,3 +344,48 @@ func (e *UniversalExecutor) makeExecutablePosix(binPath string) error { } return nil } + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func (e *UniversalExecutor) getListener(lowerBound uint, upperBound uint) (net.Listener, error) { + if runtime.GOOS == "windows" { + return e.listenerTCP(lowerBound, upperBound) + } + + return e.listenerUnix() +} + +// listenerTCP creates a TCP listener using an unused port between an upper and +// lower bound +func (e *UniversalExecutor) listenerTCP(lowerBound uint, upperBound uint) (net.Listener, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + return l, nil + } + return nil, fmt.Errorf("No free port found") +} + +// listenerUnix creates a Unix domain socket +func (e *UniversalExecutor) listenerUnix() (net.Listener, error) { + f, err := ioutil.TempFile("", "plugin") + if err != nil { + return nil, err + } + path := f.Name() + + if err := f.Close(); err != nil { + return nil, err + } + if err := os.Remove(path); err != nil { + return nil, err + } + + return net.Listen("unix", path) +} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index 17a83d5d7..d6c4cf38b 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -38,7 +38,7 @@ var ( func (e *UniversalExecutor) makeExecutable(binPath string) error { path := binPath - if e.ctx.FSIsolation { + if e.command.FSIsolation { // The path must be relative the chroot path = filepath.Join(e.taskDir, binPath) } else if !filepath.IsAbs(binPath) { @@ -50,14 +50,14 @@ func (e *UniversalExecutor) makeExecutable(binPath string) error { // configureIsolation configures chroot and creates cgroups func (e *UniversalExecutor) configureIsolation() error { - if e.ctx.FSIsolation { + if e.command.FSIsolation { if err := e.configureChroot(); err != nil { return err } } - if e.ctx.ResourceLimits { - if err := e.configureCgroups(e.ctx.TaskResources); err != nil { + if e.command.ResourceLimits { + if err := e.configureCgroups(e.ctx.Task.Resources); err != nil { return fmt.Errorf("error creating cgroups: %v", err) } if err := e.applyLimits(os.Getpid()); err != nil { @@ -75,7 +75,7 @@ func (e *UniversalExecutor) configureIsolation() error { // applyLimits puts a process in a pre-configured cgroup func (e *UniversalExecutor) applyLimits(pid int) error { - if !e.ctx.ResourceLimits { + if !e.command.ResourceLimits { return nil } @@ -167,11 +167,11 @@ func (e *UniversalExecutor) runAs(userid string) error { // configureChroot configures a chroot func (e *UniversalExecutor) configureChroot() error { allocDir := e.ctx.AllocDir - if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil { + if err := allocDir.MountSharedDir(e.ctx.Task.Name); err != nil { return err } - if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil { + if err := allocDir.Embed(e.ctx.Task.Name, chrootEnv); err != nil { return err } @@ -195,8 +195,8 @@ func (e *UniversalExecutor) configureChroot() error { // should be called when tearing down the task. func (e *UniversalExecutor) removeChrootMounts() error { // Prevent a race between Wait/ForceStop - e.lock.Lock() - defer e.lock.Unlock() + e.cgLock.Lock() + defer e.cgLock.Unlock() return e.ctx.AllocDir.UnmountAll() } diff --git a/client/driver/executor/executor_posix.go b/client/driver/executor/executor_posix.go new file mode 100644 index 000000000..4be468b2c --- /dev/null +++ b/client/driver/executor/executor_posix.go @@ -0,0 +1,44 @@ +// +build !windows + +package executor + +import ( + "fmt" + "io" + "log/syslog" + + "github.com/hashicorp/nomad/client/driver/logging" +) + +func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) { + e.ctx = ctx + e.syslogChan = make(chan *logging.SyslogMessage, 2048) + l, err := e.getListener(e.ctx.PortLowerBound, e.ctx.PortUpperBound) + if err != nil { + return nil, err + } + e.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String()) + if err := e.configureLoggers(); err != nil { + return nil, err + } + + e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger) + go e.syslogServer.Start() + go e.collectLogs(e.lre, e.lro) + syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String()) + return &SyslogServerState{Addr: syslogAddr}, nil +} + +func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) { + for logParts := range e.syslogChan { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout + if logParts.Severity == syslog.LOG_ERR { + e.lre.Write(logParts.Message) + e.lre.Write([]byte{'\n'}) + } else { + e.lro.Write(logParts.Message) + e.lro.Write([]byte{'\n'}) + } + } +} diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 2c45d0b45..8798170e7 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -30,7 +30,7 @@ var ( } ) -func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { +func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] @@ -39,18 +39,16 @@ func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { log.Panicf("allocDir.Build() failed: %v", err) } - return task.Name, allocDir + return task, allocDir } func testExecutorContext(t *testing.T) *ExecutorContext { taskEnv := env.NewTaskEnvironment(mock.Node()) - taskName, allocDir := mockAllocDir(t) + task, allocDir := mockAllocDir(t) ctx := &ExecutorContext{ - TaskEnv: taskEnv, - TaskName: taskName, - AllocDir: allocDir, - TaskResources: constraint, - LogConfig: structs.DefaultLogConfig(), + TaskEnv: taskEnv, + Task: task, + AllocDir: allocDir, } return ctx } @@ -80,6 +78,9 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) { if ps.ExitCode < 1 { t.Fatalf("expected exit code to be non zero, actual: %v", ps.ExitCode) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } } func TestExecutor_Start_Wait(t *testing.T) { @@ -98,6 +99,9 @@ func TestExecutor_Start_Wait(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } expected := "hello world" file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") @@ -119,9 +123,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { ctx := testExecutorContext(t) defer ctx.AllocDir.Destroy() - ctx.FSIsolation = true - ctx.ResourceLimits = true - ctx.UnprivilegedUser = true + execCmd.FSIsolation = true + execCmd.ResourceLimits = true + execCmd.User = "nobody" executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) @@ -135,6 +139,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } expected := "hello world" file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") @@ -154,13 +161,13 @@ func TestExecutor_DestroyCgroup(t *testing.T) { execCmd := ExecCommand{Cmd: "/bin/bash", Args: []string{"-c", "/usr/bin/yes"}} ctx := testExecutorContext(t) - ctx.LogConfig.MaxFiles = 1 - ctx.LogConfig.MaxFileSizeMB = 300 + ctx.Task.LogConfig.MaxFiles = 1 + ctx.Task.LogConfig.MaxFileSizeMB = 300 defer ctx.AllocDir.Destroy() - ctx.FSIsolation = true - ctx.ResourceLimits = true - ctx.UnprivilegedUser = true + execCmd.FSIsolation = true + execCmd.ResourceLimits = true + execCmd.User = "nobody" executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) @@ -171,7 +178,10 @@ func TestExecutor_DestroyCgroup(t *testing.T) { t.Fatalf("expected process to start and have non zero pid") } time.Sleep(200 * time.Millisecond) - executor.Exit() + if err := executor.Exit(); err != nil { + t.Fatalf("err: %v", err) + } + file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") finfo, err := os.Stat(file) if err != nil { @@ -203,6 +213,9 @@ func TestExecutor_Start_Kill(t *testing.T) { if err != nil { t.Fatalf("error in waiting for command: %v", err) } + if err := executor.Exit(); err != nil { + t.Fatalf("error: %v", err) + } file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0") time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) diff --git a/client/driver/executor/executor_windows.go b/client/driver/executor/executor_windows.go new file mode 100644 index 000000000..e93f936e7 --- /dev/null +++ b/client/driver/executor/executor_windows.go @@ -0,0 +1,5 @@ +package executor + +func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) { + return nil, nil +} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 1189c19dd..9e6fcda64 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -1,6 +1,7 @@ package driver import ( + "encoding/gob" "log" "net/rpc" @@ -9,6 +10,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Registering these types since we have to serialize and de-serialize the Task +// structs over the wire between drivers and the executor. +func init() { + gob.Register([]interface{}{}) + gob.Register(map[string]interface{}{}) + gob.Register([]map[string]string{}) +} + type ExecutorRPC struct { client *rpc.Client } @@ -19,12 +28,23 @@ type LaunchCmdArgs struct { Ctx *executor.ExecutorContext } +// LaunchSyslogServerArgs wraps the executor context for the purposes of RPC +type LaunchSyslogServerArgs struct { + Ctx *executor.ExecutorContext +} + func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) { var ps *executor.ProcessState err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) return ps, err } +func (e *ExecutorRPC) LaunchSyslogServer(ctx *executor.ExecutorContext) (*executor.SyslogServerState, error) { + var ss *executor.SyslogServerState + err := e.client.Call("Plugin.LaunchSyslogServer", LaunchSyslogServerArgs{Ctx: ctx}, &ss) + return ss, err +} + func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) { var ps executor.ProcessState err := e.client.Call("Plugin.Wait", new(interface{}), &ps) @@ -43,6 +63,10 @@ func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) } +func (e *ExecutorRPC) UpdateTask(task *structs.Task) error { + return e.client.Call("Plugin.UpdateTask", task, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor } @@ -55,6 +79,14 @@ func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessSt return err } +func (e *ExecutorRPCServer) LaunchSyslogServer(args LaunchSyslogServerArgs, ss *executor.SyslogServerState) error { + state, err := e.Impl.LaunchSyslogServer(args.Ctx) + if state != nil { + *ss = *state + } + return err +} + func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error { state, err := e.Impl.Wait() if state != nil { @@ -75,6 +107,10 @@ func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *inter return e.Impl.UpdateLogConfig(args) } +func (e *ExecutorRPCServer) UpdateTask(args *structs.Task, resp *interface{}) error { + return e.Impl.UpdateTask(args) +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index c8849f951..6d72eea4d 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -154,14 +154,9 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, - FSIsolation: true, - UnprivilegedUser: true, - ResourceLimits: true, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } absPath, err := GetAbsolutePath("java") @@ -169,7 +164,13 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } - ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx) + ps, err := execIntf.LaunchCmd(&executor.ExecCommand{ + Cmd: absPath, + Args: args, + FSIsolation: true, + ResourceLimits: true, + User: cstructs.DefaultUnpriviledgedUser, + }, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -290,7 +291,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { func (h *javaHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/logging/syslog_server_windows.go b/client/driver/logging/syslog_server_windows.go new file mode 100644 index 000000000..cc6a60840 --- /dev/null +++ b/client/driver/logging/syslog_server_windows.go @@ -0,0 +1,10 @@ +package logging + +type SyslogServer struct { +} + +func (s *SyslogServer) Shutdown() { +} + +type SyslogMessage struct { +} diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 1c8d89cf5..ba9b77397 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -191,11 +191,9 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) if err != nil { @@ -292,7 +290,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { func (h *qemuHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 3b1c87372..db7635959 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -96,11 +96,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil { @@ -195,7 +193,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { func (h *rawExecHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 3dea50b04..ab7eb6472 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -233,12 +233,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - AllocDir: ctx.AllocDir, - TaskName: task.Name, - TaskResources: task.Resources, - UnprivilegedUser: false, - LogConfig: task.LogConfig, + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } absPath, err := GetAbsolutePath("rkt") @@ -329,7 +326,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { func (h *rktHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout) - h.executor.UpdateLogConfig(task.LogConfig) + h.executor.UpdateTask(task) // Update is not possible return nil diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index cafd25c26..e304fd679 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -6,6 +6,10 @@ import ( cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) +const ( + DefaultUnpriviledgedUser = "nobody" +) + // WaitResult stores the result of a Wait operation. type WaitResult struct { ExitCode int From abf8e50c2985a66d3a200224dbe65be4eb7da13e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 18 Mar 2016 11:53:25 -0700 Subject: [PATCH 06/13] Destroying the plugin if kill wasn't successful --- client/driver/docker.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/driver/docker.go b/client/driver/docker.go index 5829b7ad5..bc198f7fc 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -759,9 +759,13 @@ func (h *DockerHandle) Kill() error { // Container has already been removed. if strings.Contains(err.Error(), NoSuchContainerError) { h.logger.Printf("[DEBUG] driver.docker: attempted to stop non-existent container %s", h.containerID) + h.executor.Exit() + h.pluginClient.Kill() return nil } h.logger.Printf("[ERR] driver.docker: failed to stop container %s: %v", h.containerID, err) + h.executor.Exit() + h.pluginClient.Kill() return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err) } h.logger.Printf("[INFO] driver.docker: stopped container %s", h.containerID) From ddbf18f02af89767950625a7ae20bf23d0e5b03d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 18 Mar 2016 12:04:11 -0700 Subject: [PATCH 07/13] Removing all the destroy logic from wait and calling exit after wait on all drivers --- client/driver/exec.go | 3 +++ client/driver/executor/executor.go | 11 ++++++----- client/driver/java.go | 1 + client/driver/qemu.go | 1 + client/driver/raw_exec.go | 3 +++ client/driver/rkt.go | 3 +++ 6 files changed, 17 insertions(+), 5 deletions(-) diff --git a/client/driver/exec.go b/client/driver/exec.go index c3e99a3e8..f097d0079 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -268,5 +268,8 @@ func (h *execHandle) run() { } h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) + if err := h.executor.Exit(); err != nil { + h.logger.Printf("[ERR] driver.exec: error destroying executor: %v", err) + } h.pluginClient.Kill() } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 9ace1cb72..7afcef812 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -241,11 +241,6 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() - if e.syslogServer != nil { - e.syslogServer.Shutdown() - } - e.lre.Close() - e.lro.Close() if err == nil { e.exitState = &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()} return @@ -269,6 +264,12 @@ var ( // process func (e *UniversalExecutor) Exit() error { var merr multierror.Error + if e.syslogServer != nil { + e.syslogServer.Shutdown() + } + e.lre.Close() + e.lro.Close() + if e.command != nil && e.cmd.Process != nil { proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { diff --git a/client/driver/java.go b/client/driver/java.go index 6d72eea4d..19b66ff84 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -339,5 +339,6 @@ func (h *javaHandle) run() { } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.executor.Exit() h.pluginClient.Kill() } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index ba9b77397..47d0ed5c5 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -334,5 +334,6 @@ func (h *qemuHandle) run() { close(h.doneCh) h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.executor.Exit() h.pluginClient.Kill() } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index db7635959..564d8061a 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -235,5 +235,8 @@ func (h *rawExecHandle) run() { } h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + if err := h.executor.Exit(); err != nil { + h.logger.Printf("[ERR] driver.raw_exec: error killing executor: %v", err) + } h.pluginClient.Kill() } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index ab7eb6472..e2fb6135d 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -357,5 +357,8 @@ func (h *rktHandle) run() { } h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) + if err := h.executor.Exit(); err != nil { + h.logger.Printf("[ERR] driver.rkt: error killing executor: %v", err) + } h.pluginClient.Kill() } From 4f01d2f8bf0d818a2bb30d8cea6858971d54517b Mon Sep 17 00:00:00 2001 From: Jake Champlin Date: Fri, 18 Mar 2016 15:05:35 -0400 Subject: [PATCH 08/13] Allow fs commands to use job-id Adds `-job` flag argument to `nomad fs` commands to randomly lookup a job's allocation-id to use in an `fs` command. Can be used when debugging a job, where a specific allocation ID is not a strict requirement. --- command/fs.go | 28 ++++++++++++++++++++- command/fs_cat.go | 18 ++++++++++--- command/fs_ls.go | 20 ++++++++++++--- command/fs_stat.go | 18 ++++++++++--- website/source/docs/commands/fs.html.md.erb | 13 +++++++++- 5 files changed, 86 insertions(+), 11 deletions(-) diff --git a/command/fs.go b/command/fs.go index 3925d044b..4b3a47174 100644 --- a/command/fs.go +++ b/command/fs.go @@ -1,6 +1,12 @@ package command -import "github.com/mitchellh/cli" +import ( + "math/rand" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/mitchellh/cli" +) type FSCommand struct { Meta @@ -17,3 +23,23 @@ func (f *FSCommand) Synopsis() string { func (f *FSCommand) Run(args []string) int { return cli.RunResultHelp } + +// Get Random Allocation ID from a known jobID. Prefer to use a running allocation, +// but use a dead allocation if no running allocations are found +func getRandomJobAlloc(client *api.Client, jobID string) (string, error) { + var runningAllocs []*api.AllocationListStub + allocs, _, err := client.Jobs().Allocations(jobID, nil) + for _, v := range allocs { + if v.ClientStatus == "running" { + runningAllocs = append(runningAllocs, v) + } + } + // If we don't have any allocations running, use dead allocations + if len(runningAllocs) < 1 { + runningAllocs = allocs + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + allocID := runningAllocs[r.Intn(len(runningAllocs))].ID + return allocID, err +} diff --git a/command/fs_cat.go b/command/fs_cat.go index 2322846d8..83d080b94 100644 --- a/command/fs_cat.go +++ b/command/fs_cat.go @@ -26,6 +26,9 @@ Cat Options: -verbose Show full information. + + -job + Use a random allocation from a specified job-id. ` return strings.TrimSpace(helpText) } @@ -35,10 +38,11 @@ func (f *FSCatCommand) Synopsis() string { } func (f *FSCatCommand) Run(args []string) int { - var verbose bool + var verbose, job bool flags := f.Meta.FlagSet("fs-list", FlagSetClient) flags.Usage = func() { f.Ui.Output(f.Help()) } flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&job, "job", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -50,7 +54,6 @@ func (f *FSCatCommand) Run(args []string) int { return 1 } - allocID := args[0] path := "/" if len(args) == 2 { path = args[1] @@ -58,10 +61,19 @@ func (f *FSCatCommand) Run(args []string) int { client, err := f.Meta.Client() if err != nil { - f.Ui.Error(fmt.Sprintf("Error inititalizing client: %v", err)) + f.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) return 1 } + // If -job is specified, use random allocation, otherwise use provided allocation + allocID := args[0] + if job { + allocID, err = getRandomJobAlloc(client, args[0]) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying API: %v", err)) + } + } + // Truncate the id unless full length is requested length := shortId if verbose { diff --git a/command/fs_ls.go b/command/fs_ls.go index a15373d95..d86de1bf1 100644 --- a/command/fs_ls.go +++ b/command/fs_ls.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/dustin/go-humanize" + humanize "github.com/dustin/go-humanize" ) type FSListCommand struct { @@ -30,6 +30,9 @@ Ls Options: -verbose Show full information. + -job + Use a random allocation from a specified job-id. + ` return strings.TrimSpace(helpText) } @@ -41,10 +44,12 @@ func (f *FSListCommand) Synopsis() string { func (f *FSListCommand) Run(args []string) int { var verbose bool var machine bool + var job bool flags := f.Meta.FlagSet("fs-list", FlagSetClient) flags.Usage = func() { f.Ui.Output(f.Help()) } flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&machine, "H", false, "") + flags.BoolVar(&job, "job", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -56,7 +61,6 @@ func (f *FSListCommand) Run(args []string) int { return 1 } - allocID := args[0] path := "/" if len(args) == 2 { path = args[1] @@ -64,10 +68,20 @@ func (f *FSListCommand) Run(args []string) int { client, err := f.Meta.Client() if err != nil { - f.Ui.Error(fmt.Sprintf("Error inititalizing client: %v", err)) + f.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) return 1 } + // If -job is specified, use random allocation, otherwise use provided allocation + allocID := args[0] + if job { + allocID, err = getRandomJobAlloc(client, args[0]) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) + return 1 + } + } + // Truncate the id unless full length is requested length := shortId if verbose { diff --git a/command/fs_stat.go b/command/fs_stat.go index 4f20a4a47..6caf3ab76 100644 --- a/command/fs_stat.go +++ b/command/fs_stat.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/dustin/go-humanize" + humanize "github.com/dustin/go-humanize" ) type FSStatCommand struct { @@ -29,6 +29,9 @@ Stat Options: -verbose Show full information. + + -job + Use a random allocation from a specified job-id. ` return strings.TrimSpace(helpText) } @@ -40,10 +43,12 @@ func (f *FSStatCommand) Synopsis() string { func (f *FSStatCommand) Run(args []string) int { var verbose bool var machine bool + var job bool flags := f.Meta.FlagSet("fs-list", FlagSetClient) flags.Usage = func() { f.Ui.Output(f.Help()) } flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&machine, "H", false, "") + flags.BoolVar(&job, "job", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -55,7 +60,6 @@ func (f *FSStatCommand) Run(args []string) int { return 1 } - allocID := args[0] path := "/" if len(args) == 2 { path = args[1] @@ -63,10 +67,18 @@ func (f *FSStatCommand) Run(args []string) int { client, err := f.Meta.Client() if err != nil { - f.Ui.Error(fmt.Sprintf("Error inititalizing client: %v", err)) + f.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) return 1 } + allocID := args[0] + if job { + allocID, err = getRandomJobAlloc(client, args[0]) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying API: %v", err)) + } + } + // Truncate the id unless full length is requested length := shortId if verbose { diff --git a/website/source/docs/commands/fs.html.md.erb b/website/source/docs/commands/fs.html.md.erb index f4b7616cf..645656467 100644 --- a/website/source/docs/commands/fs.html.md.erb +++ b/website/source/docs/commands/fs.html.md.erb @@ -23,7 +23,7 @@ nomad fs stat nomad fs cat ``` -A valid allocation id is necessary and the path is relative to the root of the allocation directory. +A valid allocation id is necessary unless `-job` is specified and the path is relative to the root of the allocation directory. The path is optional and it defaults to `/` of the allocation directory ## Examples @@ -50,3 +50,14 @@ $ nomad fs cat redis/local/redis.stdout 6710:C 27 Jan 22:04:03.794 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf 6710:M 27 Jan 22:04:03.795 * Increased maximum number of open files to 10032 (it was originally set to 256). +## Using Job-ID instead of Alloc-ID + +Passing `-job` into one of the `fs` commands will allow the `fs` command to randomly select an allocation ID from the specified job. + +``` +nomad fs ls -job +``` + +Nomad will prefer to select a running allocation ID for the job, but if no running allocations for the job are found, Nomad will use a dead allocation. + +This can be useful for debugging a job that has multiple allocations, and it's not really required to use a specific allocation ID. \ No newline at end of file From 038a1bc2f95534a9e7a872b13d862c09358a11fd Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Fri, 18 Mar 2016 15:04:15 -0700 Subject: [PATCH 09/13] Added some comments --- client/driver/executor/executor.go | 3 +++ client/driver/executor/executor_test.go | 2 +- client/driver/structs/structs.go | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 7afcef812..ceba6b0ea 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -211,6 +211,7 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { return e.exitState, nil } +// COMPAT: prior to Nomad 0.3.2, UpdateTask didn't exist. // UpdateLogConfig updates the log configuration func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { e.ctx.Task.LogConfig = logConfig @@ -230,6 +231,8 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { e.ctx.Task = task + + // Updating Log Config fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024) e.lro.MaxFiles = task.LogConfig.MaxFiles e.lro.FileSize = fileSize diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 8798170e7..629a930af 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -125,7 +125,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { execCmd.FSIsolation = true execCmd.ResourceLimits = true - execCmd.User = "nobody" + execCmd.User = cstructs.DefaultUnpriviledgedUser executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index e304fd679..a23990427 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -7,6 +7,7 @@ import ( ) const ( + // The default user that the executor uses to run tasks DefaultUnpriviledgedUser = "nobody" ) From a37670974cb59a6a29da10fd0fc2f6c445e6fecb Mon Sep 17 00:00:00 2001 From: Pete Shima Date: Fri, 18 Mar 2016 20:43:31 -0700 Subject: [PATCH 10/13] Add slightly more detail to the Nomad upgrade guide. --- website/source/docs/upgrade/index.html.md | 116 ++++++++++++++++++---- 1 file changed, 98 insertions(+), 18 deletions(-) diff --git a/website/source/docs/upgrade/index.html.md b/website/source/docs/upgrade/index.html.md index 0184cd70d..8ab82d4dc 100644 --- a/website/source/docs/upgrade/index.html.md +++ b/website/source/docs/upgrade/index.html.md @@ -1,6 +1,6 @@ --- layout: "docs" -page_title: "Upgrade Nomad" +page_title: "Upgrading Nomad" sidebar_current: "docs-upgrade-upgrading" description: |- Learn how to upgrade Nomad. @@ -8,29 +8,109 @@ description: |- # Upgrading Nomad -Both Nomad Clients and Servers are meant to be long-running processes that -maintain communication with each other. Nomad Servers maintain quorum with other -Servers and Clients are in constant communication with Servers. As such, care -should be taken to properly upgrade Nomad to ensure minimal service disruption. - This page documents how to upgrade Nomad when a new version is released. -## Standard Upgrades +~> **Upgrade Warning!** Both Nomad Clients and Servers are meant to be +long-running processes that maintain communication with each other. Nomad +Servers maintain quorum with other Servers and Clients are in constant +communication with Servers. As such, care should be taken to properly +upgrade Nomad to ensure minimal service disruption. Unsafe upgrades can +cause a service outage. + +## Upgrade Process For upgrades we strive to ensure backwards compatibility. For most upgrades, the -process is simple. Assuming the current version of Nomad is A, and version B is -released. +process is as simple as upgrading the binary and restarting the service. -1. On each server, install version B of Nomad. +Prior to starting the upgrade please check the +[specific version details](/docs/upgrade/upgrade-specific.html) page as some +version differences may require specific steps. -2. Shut down version A, restart with version B on one server at a time. +At a high level we complete the following steps to upgrade Nomad: - 3. You can run `nomad server-members` to ensure that all servers are - clustered and running the version B. +* **Add the new version** +* **Check cluster health** +* **Remove the old version** +* **Check cluster health** +* **Upgrade clients** + +### 1. Add the new version to the existing cluster + +Whether you are replacing the software in place on existing systems or bringing +up new hosts you should make changes incrementally, verifying cluster health at +each step of the upgrade + +On a single server, install the new version of Nomad. You can do this by +joining a new server to the cluster or by replacing or upgrading the binary +locally and restarting the service. + +### 2. Check cluster health + +Monitor the Nomad logs on the remaining nodes to check the new node has entered +the cluster correctly. + +Run `nomad agent-info` on the new server and check that the `last_log_index` is +of a similar value to the other nodes. This step ensures that changes have been +replicated to the new node. + +``` +ubuntu@nomad-server-10-1-1-4:~$ nomad agent-info +nomad + bootstrap = false + known_regions = 1 + leader = false + server = true +raft + applied_index = 53460 + commit_index = 53460 + fsm_pending = 0 + last_contact = 54.512216ms + last_log_index = 53460 + last_log_term = 1 + last_snapshot_index = 49511 + last_snapshot_term = 1 + num_peers = 2 +... +``` + +Continue with the upgrades across the Server fleet making sure to do a single Nomad +server at a time. You can check state of the servers and clients with the +`nomad server-members` and `nomad node-status` commands which indicate state of the +nodes. + +### 3. Remove the old versions from servers + +If you are doing an in place upgrade on existing servers this step is not +necessary as the version was changed in place. + +If you are doing an upgrade by adding new servers and removing old servers +from the fleet you need to ensure that the server has left the fleet safely. + +1. Stop the service on the existing host +2. On another server issue a `nomad server-members` and check the status, if +the server is now in a left state you are safe to continue. +3. If the server is not in a left state, issue a `nomad server-force-leave ` +to remove the server from the cluster. + +Monitor the logs of the other hosts in the Nomad cluster over this period. + +### 4. Check cluster health + +Use the same actions in step #2 above to confirm cluster health. + +### 5. Upgrade clients + +Following the successful upgrade of the servers you can now update your +clients using a similar process as the servers. If you wish to gracefully +move tasks on a client use the `nomad node-drain ` command to +gracefully migrate jobs to another client in the cluster. The `node-drain` +command prevents new tasks from being allocated to the client and begins +migrating existing allocations to another client. + +## Done! + +You are now running the latest Nomad version. You can verify all +Clients joined by running `nomad node-status` and checking all the clients +are in a `ready` state. -4. Once all the servers are upgraded, begin a rollout of clients following - the same process. - 5. Done! You are now running the latest Nomad version. You can verify all - Clients joined by running `nomad node-status` and checking all the clients - are in a `ready` state. From ecd798b3acb78a61b1be2887ebd4e9a73c042437 Mon Sep 17 00:00:00 2001 From: Pete Shima Date: Fri, 18 Mar 2016 21:32:56 -0700 Subject: [PATCH 11/13] Minor updates to Interpreted Variables docs --- .../source/docs/jobspec/interpreted.html.md | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/website/source/docs/jobspec/interpreted.html.md b/website/source/docs/jobspec/interpreted.html.md index 0fdeb0c5d..a912784e8 100644 --- a/website/source/docs/jobspec/interpreted.html.md +++ b/website/source/docs/jobspec/interpreted.html.md @@ -65,30 +65,37 @@ driver. Variable Description + Example ${node.unique.id} - The client node identifier + The 36 character unique client node identifier + 9afa5da1-8f39-25a2-48dc-ba31fd7c0023 ${node.datacenter} - The client node datacenter + The client node's datacenter + dc1 ${node.unique.name} - The client node name + The client node's name + nomad-client-10-1-2-4 ${node.class} - The client node class + The client node's class + linux-64bit ${attr.\} The attribute given by `key` on the client node. + platform.aws.instance-type:r3.large ${meta.\} The metadata value given by `key` on the client node. + @@ -177,19 +184,19 @@ a particular node and as such can not be used in constraints. The CPU limit in MHz for the task - NOMAD_ALLOC_ID + ${NOMAD_ALLOC_ID} The allocation ID of the task - NOMAD_ALLOC_NAME + ${NOMAD_ALLOC_NAME} The allocation name of the task - NOMAD_ALLOC_INDEX + ${NOMAD_ALLOC_INDEX} The allocation index; useful to distinguish instances of task groups - NOMAD_TASK_NAME + ${NOMAD_TASK_NAME} The task's name From 5a5ef65bc4e43a66a2a672034dc51450c342ce24 Mon Sep 17 00:00:00 2001 From: Abhishek Chanda Date: Fri, 18 Mar 2016 22:18:59 -0700 Subject: [PATCH 12/13] Update rkt and docker --- scripts/install_rkt.sh | 2 +- scripts/update_docker.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/install_rkt.sh b/scripts/install_rkt.sh index d3987e480..9d404f3ee 100755 --- a/scripts/install_rkt.sh +++ b/scripts/install_rkt.sh @@ -2,7 +2,7 @@ set -ex -RKT_VERSION="v1.0.0" +RKT_VERSION="v1.2.0" DEST_DIR="/usr/local/bin" sudo mkdir -p /etc/rkt/net.d diff --git a/scripts/update_docker.sh b/scripts/update_docker.sh index 18b258ee4..6e7a7074d 100755 --- a/scripts/update_docker.sh +++ b/scripts/update_docker.sh @@ -2,7 +2,7 @@ set -ex -DOCKER_VERSION="1.10.2" +DOCKER_VERSION="1.10.3" sudo stop docker sudo rm -rf /var/lib/docker From 2d3ccc1898c810f249cf447c466d10d4b11697cb Mon Sep 17 00:00:00 2001 From: Abhishek Chanda Date: Fri, 18 Mar 2016 23:00:34 -0700 Subject: [PATCH 13/13] Fix missing import --- client/driver/executor/executor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 629a930af..0a43e3494 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" tu "github.com/hashicorp/nomad/testutil" + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var (