commands: move exec command to separate pkg

This commit is contained in:
Frank Schroeder 2017-10-11 14:51:22 +02:00 committed by Frank Schröder
parent ad50e2a410
commit 8c7f013ae1
3 changed files with 117 additions and 122 deletions

View File

@ -9,6 +9,7 @@ import (
"syscall" "syscall"
"github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/event"
execmd "github.com/hashicorp/consul/command/exec"
"github.com/hashicorp/consul/command/join" "github.com/hashicorp/consul/command/join"
"github.com/hashicorp/consul/command/validate" "github.com/hashicorp/consul/command/validate"
"github.com/hashicorp/consul/version" "github.com/hashicorp/consul/version"
@ -77,13 +78,7 @@ func init() {
}, },
"exec": func() (cli.Command, error) { "exec": func() (cli.Command, error) {
return &ExecCommand{ return execmd.New(ui, makeShutdownCh()), nil
ShutdownCh: makeShutdownCh(),
BaseCommand: BaseCommand{
Flags: FlagSetHTTP,
UI: ui,
},
}, nil
}, },
"force-leave": func() (cli.Command, error) { "force-leave": func() (cli.Command, error) {

View File

@ -1,8 +1,9 @@
package command package exec
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -13,7 +14,8 @@ import (
"time" "time"
"unicode" "unicode"
consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
@ -117,47 +119,55 @@ type rExecExit struct {
Code int Code int
} }
// ExecCommand is a Command implementation that is used to func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd {
// do remote execution of commands c := &cmd{UI: ui, shutdownCh: shutdownCh}
type ExecCommand struct { c.initFlags()
BaseCommand return c
}
ShutdownCh <-chan struct{} type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
shutdownCh <-chan struct{}
conf rExecConf conf rExecConf
client *consulapi.Client apiclient *api.Client
sessionID string sessionID string
stopCh chan struct{} stopCh chan struct{}
} }
func (c *ExecCommand) initFlags() { func (c *cmd) initFlags() {
c.InitFlagSet() c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.FlagSet.StringVar(&c.conf.node, "node", "", c.flags.StringVar(&c.conf.node, "node", "",
"Regular expression to filter on node names.") "Regular expression to filter on node names.")
c.FlagSet.StringVar(&c.conf.service, "service", "", c.flags.StringVar(&c.conf.service, "service", "",
"Regular expression to filter on service instances.") "Regular expression to filter on service instances.")
c.FlagSet.StringVar(&c.conf.tag, "tag", "", c.flags.StringVar(&c.conf.tag, "tag", "",
"Regular expression to filter on service tags. Must be used with -service.") "Regular expression to filter on service tags. Must be used with -service.")
c.FlagSet.StringVar(&c.conf.prefix, "prefix", rExecPrefix, c.flags.StringVar(&c.conf.prefix, "prefix", rExecPrefix,
"Prefix in the KV store to use for request data.") "Prefix in the KV store to use for request data.")
c.FlagSet.BoolVar(&c.conf.shell, "shell", true, c.flags.BoolVar(&c.conf.shell, "shell", true,
"Use a shell to run the command.") "Use a shell to run the command.")
c.FlagSet.DurationVar(&c.conf.wait, "wait", rExecQuietWait, c.flags.DurationVar(&c.conf.wait, "wait", rExecQuietWait,
"Period to wait with no responses before terminating execution.") "Period to wait with no responses before terminating execution.")
c.FlagSet.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, c.flags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait,
"Period to wait for replication before firing event. This is an "+ "Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed.")
"optimization to allow stale reads to be performed.") c.flags.BoolVar(&c.conf.verbose, "verbose", false,
c.FlagSet.BoolVar(&c.conf.verbose, "verbose", false,
"Enables verbose output.") "Enables verbose output.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
} }
func (c *ExecCommand) Run(args []string) int { func (c *cmd) Run(args []string) int {
c.initFlags() if err := c.flags.Parse(args); err != nil {
if err := c.FlagSet.Parse(args); err != nil {
return 1 return 1
} }
// Join the commands to execute // Join the commands to execute
c.conf.cmd = strings.Join(c.FlagSet.Args(), " ") c.conf.cmd = strings.Join(c.flags.Args(), " ")
// If there is no command, read stdin for a script input // If there is no command, read stdin for a script input
if c.conf.cmd == "-" { if c.conf.cmd == "-" {
@ -178,7 +188,7 @@ func (c *ExecCommand) Run(args []string) int {
c.conf.script = buf.Bytes() c.conf.script = buf.Bytes()
} else if !c.conf.shell { } else if !c.conf.shell {
c.conf.cmd = "" c.conf.cmd = ""
c.conf.args = c.FlagSet.Args() c.conf.args = c.flags.Args()
} }
// Ensure we have a command or script // Ensure we have a command or script
@ -196,7 +206,7 @@ func (c *ExecCommand) Run(args []string) int {
} }
// Create and test the HTTP client // Create and test the HTTP client
client, err := c.HTTPClient() client, err := c.http.APIClient()
if err != nil { if err != nil {
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1 return 1
@ -206,10 +216,10 @@ func (c *ExecCommand) Run(args []string) int {
c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1 return 1
} }
c.client = client c.apiclient = client
// Check if this is a foreign datacenter // Check if this is a foreign datacenter
if c.HTTPDatacenter() != "" && c.HTTPDatacenter() != info["Config"]["Datacenter"] { if c.http.Datacenter() != "" && c.http.Datacenter() != info["Config"]["Datacenter"] {
if c.conf.verbose { if c.conf.verbose {
c.UI.Info("Remote exec in foreign datacenter, using Session TTL") c.UI.Info("Remote exec in foreign datacenter, using Session TTL")
} }
@ -252,7 +262,7 @@ func (c *ExecCommand) Run(args []string) int {
// largely this is a heuristic. // largely this is a heuristic.
select { select {
case <-time.After(c.conf.replWait): case <-time.After(c.conf.replWait):
case <-c.ShutdownCh: case <-c.shutdownCh:
return 1 return 1
} }
@ -270,8 +280,22 @@ func (c *ExecCommand) Run(args []string) int {
return c.waitForJob() return c.waitForJob()
} }
func (c *cmd) Synopsis() string {
return "Executes a command on Consul nodes"
}
func (c *cmd) Help() string {
s := `Usage: consul exec [options] [-|command...]
Evaluates a command on remote Consul nodes. The nodes responding can
be filtered using regular expressions on node name, service, and tag
definitions. If a command is '-', stdin will be read until EOF
and used as a script input. `
return flags.Usage(s, c.flags, c.http.ClientFlags(), c.http.ServerFlags())
}
// waitForJob is used to poll for results and wait until the job is terminated // waitForJob is used to poll for results and wait until the job is terminated
func (c *ExecCommand) waitForJob() int { func (c *cmd) waitForJob() int {
// Although the session destroy is already deferred, we do it again here, // Although the session destroy is already deferred, we do it again here,
// because invalidation of the session before destroyData() ensures there is // because invalidation of the session before destroyData() ensures there is
// no race condition allowing an agent to upload data (the acquire will fail). // no race condition allowing an agent to upload data (the acquire will fail).
@ -337,7 +361,7 @@ OUTER:
case <-errCh: case <-errCh:
return 1 return 1
case <-c.ShutdownCh: case <-c.shutdownCh:
return 1 return 1
} }
} }
@ -350,10 +374,10 @@ OUTER:
// streamResults is used to perform blocking queries against the KV endpoint and stream in // streamResults is used to perform blocking queries against the KV endpoint and stream in
// notice of various events into waitForJob // notice of various events into waitForJob
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, func (c *cmd) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) { outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
kv := c.client.KV() kv := c.apiclient.KV()
opts := consulapi.QueryOptions{WaitTime: c.conf.wait} opts := api.QueryOptions{WaitTime: c.conf.wait}
dir := path.Join(c.conf.prefix, c.sessionID) + "/" dir := path.Join(c.conf.prefix, c.sessionID) + "/"
seen := make(map[string]struct{}) seen := make(map[string]struct{})
@ -465,7 +489,7 @@ func (conf *rExecConf) validate() error {
} }
// createSession is used to create a new session for this command // createSession is used to create a new session for this command
func (c *ExecCommand) createSession() (string, error) { func (c *cmd) createSession() (string, error) {
var id string var id string
var err error var err error
if c.conf.foreignDC { if c.conf.foreignDC {
@ -482,11 +506,11 @@ func (c *ExecCommand) createSession() (string, error) {
// createSessionLocal is used to create a new session in a local datacenter // createSessionLocal is used to create a new session in a local datacenter
// This is simpler since we can use the local agent to create the session. // This is simpler since we can use the local agent to create the session.
func (c *ExecCommand) createSessionLocal() (string, error) { func (c *cmd) createSessionLocal() (string, error) {
session := c.client.Session() session := c.apiclient.Session()
se := consulapi.SessionEntry{ se := api.SessionEntry{
Name: "Remote Exec", Name: "Remote Exec",
Behavior: consulapi.SessionBehaviorDelete, Behavior: api.SessionBehaviorDelete,
TTL: rExecTTL, TTL: rExecTTL,
} }
id, _, err := session.Create(&se, nil) id, _, err := session.Create(&se, nil)
@ -496,9 +520,9 @@ func (c *ExecCommand) createSessionLocal() (string, error) {
// createSessionLocal is used to create a new session in a foreign datacenter // createSessionLocal is used to create a new session in a foreign datacenter
// This is more complex since the local agent cannot be used to create // This is more complex since the local agent cannot be used to create
// a session, and we must associate with a node in the remote datacenter. // a session, and we must associate with a node in the remote datacenter.
func (c *ExecCommand) createSessionForeign() (string, error) { func (c *cmd) createSessionForeign() (string, error) {
// Look for a remote node to bind to // Look for a remote node to bind to
health := c.client.Health() health := c.apiclient.Health()
services, _, err := health.Service("consul", "", true, nil) services, _, err := health.Service("consul", "", true, nil)
if err != nil { if err != nil {
return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err) return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
@ -508,16 +532,15 @@ func (c *ExecCommand) createSessionForeign() (string, error) {
} }
node := services[0].Node.Node node := services[0].Node.Node
if c.conf.verbose { if c.conf.verbose {
c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", node, c.http.Datacenter()))
node, c.HTTPDatacenter()))
} }
session := c.client.Session() session := c.apiclient.Session()
se := consulapi.SessionEntry{ se := api.SessionEntry{
Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC), Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
Node: node, Node: node,
Checks: []string{}, Checks: []string{},
Behavior: consulapi.SessionBehaviorDelete, Behavior: api.SessionBehaviorDelete,
TTL: rExecTTL, TTL: rExecTTL,
} }
id, _, err := session.CreateNoChecks(&se, nil) id, _, err := session.CreateNoChecks(&se, nil)
@ -527,8 +550,8 @@ func (c *ExecCommand) createSessionForeign() (string, error) {
// renewSession is a long running routine that periodically renews // renewSession is a long running routine that periodically renews
// the session TTL. This is used for foreign sessions where we depend // the session TTL. This is used for foreign sessions where we depend
// on TTLs. // on TTLs.
func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) { func (c *cmd) renewSession(id string, stopCh chan struct{}) {
session := c.client.Session() session := c.apiclient.Session()
for { for {
select { select {
case <-time.After(rExecRenewInterval): case <-time.After(rExecRenewInterval):
@ -544,7 +567,7 @@ func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
} }
// destroySession is used to destroy the associated session // destroySession is used to destroy the associated session
func (c *ExecCommand) destroySession() error { func (c *cmd) destroySession() error {
// Stop the session renew if any // Stop the session renew if any
if c.stopCh != nil { if c.stopCh != nil {
close(c.stopCh) close(c.stopCh)
@ -552,7 +575,7 @@ func (c *ExecCommand) destroySession() error {
} }
// Destroy the session explicitly // Destroy the session explicitly
session := c.client.Session() session := c.apiclient.Session()
_, err := session.Destroy(c.sessionID, nil) _, err := session.Destroy(c.sessionID, nil)
return err return err
} }
@ -560,7 +583,7 @@ func (c *ExecCommand) destroySession() error {
// makeRExecSpec creates a serialized job specification // makeRExecSpec creates a serialized job specification
// that can be uploaded which will be parsed by agents to // that can be uploaded which will be parsed by agents to
// determine what to do. // determine what to do.
func (c *ExecCommand) makeRExecSpec() ([]byte, error) { func (c *cmd) makeRExecSpec() ([]byte, error) {
spec := &rExecSpec{ spec := &rExecSpec{
Command: c.conf.cmd, Command: c.conf.cmd,
Args: c.conf.args, Args: c.conf.args,
@ -571,9 +594,9 @@ func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
} }
// uploadPayload is used to upload the request payload // uploadPayload is used to upload the request payload
func (c *ExecCommand) uploadPayload(payload []byte) error { func (c *cmd) uploadPayload(payload []byte) error {
kv := c.client.KV() kv := c.apiclient.KV()
pair := consulapi.KVPair{ pair := api.KVPair{
Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName), Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName),
Value: payload, Value: payload,
Session: c.sessionID, Session: c.sessionID,
@ -591,8 +614,8 @@ func (c *ExecCommand) uploadPayload(payload []byte) error {
// destroyData is used to nuke all the data associated with // destroyData is used to nuke all the data associated with
// this remote exec. We just do a recursive delete of our // this remote exec. We just do a recursive delete of our
// data directory. // data directory.
func (c *ExecCommand) destroyData() error { func (c *cmd) destroyData() error {
kv := c.client.KV() kv := c.apiclient.KV()
dir := path.Join(c.conf.prefix, c.sessionID) dir := path.Join(c.conf.prefix, c.sessionID)
_, err := kv.DeleteTree(dir, nil) _, err := kv.DeleteTree(dir, nil)
return err return err
@ -600,7 +623,7 @@ func (c *ExecCommand) destroyData() error {
// fireEvent is used to fire the event that will notify nodes // fireEvent is used to fire the event that will notify nodes
// about the remote execution. Returns the event ID or error // about the remote execution. Returns the event ID or error
func (c *ExecCommand) fireEvent() (string, error) { func (c *cmd) fireEvent() (string, error) {
// Create the user event payload // Create the user event payload
msg := &rExecEvent{ msg := &rExecEvent{
Prefix: c.conf.prefix, Prefix: c.conf.prefix,
@ -612,8 +635,8 @@ func (c *ExecCommand) fireEvent() (string, error) {
} }
// Format the user event // Format the user event
event := c.client.Event() event := c.apiclient.Event()
params := &consulapi.UserEvent{ params := &api.UserEvent{
Name: "_rexec", Name: "_rexec",
Payload: buf, Payload: buf,
NodeFilter: c.conf.node, NodeFilter: c.conf.node,
@ -626,23 +649,6 @@ func (c *ExecCommand) fireEvent() (string, error) {
return id, err return id, err
} }
func (c *ExecCommand) Synopsis() string {
return "Executes a command on Consul nodes"
}
func (c *ExecCommand) Help() string {
c.initFlags()
return c.HelpCommand(`
Usage: consul exec [options] [-|command...]
Evaluates a command on remote Consul nodes. The nodes responding can
be filtered using regular expressions on node name, service, and tag
definitions. If a command is '-', stdin will be read until EOF
and used as a script input.
`)
}
// TargetedUI is a UI that wraps another UI implementation and modifies // TargetedUI is a UI that wraps another UI implementation and modifies
// the output to indicate a specific target. Specifically, all Say output // the output to indicate a specific target. Specifically, all Say output
// is prefixed with the target name. Message output is not prefixed but // is prefixed with the target name. Message output is not prefixed but

View File

@ -1,4 +1,4 @@
package command package exec
import ( import (
"strings" "strings"
@ -11,21 +11,12 @@ import (
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
func testExecCommand(t *testing.T) (*cli.MockUi, *ExecCommand) { func TestExecCommand_noTabs(t *testing.T) {
ui := cli.NewMockUi() if strings.ContainsRune(New(nil, nil).Help(), '\t') {
return ui, &ExecCommand{ t.Fatal("usage has tabs")
BaseCommand: BaseCommand{
UI: ui,
Flags: FlagSetHTTP,
},
} }
} }
func TestExecCommand_implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &ExecCommand{}
}
func TestExecCommandRun(t *testing.T) { func TestExecCommandRun(t *testing.T) {
t.Parallel() t.Parallel()
a := agent.NewTestAgent(t.Name(), ` a := agent.NewTestAgent(t.Name(), `
@ -33,7 +24,8 @@ func TestExecCommandRun(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
ui, c := testExecCommand(t) ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a.HTTPAddr(), "-wait=1s", "uptime"} args := []string{"-http-addr=" + a.HTTPAddr(), "-wait=1s", "uptime"}
code := c.Run(args) code := c.Run(args)
@ -53,7 +45,8 @@ func TestExecCommandRun_NoShell(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
ui, c := testExecCommand(t) ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a.HTTPAddr(), "-shell=false", "-wait=1s", "uptime"} args := []string{"-http-addr=" + a.HTTPAddr(), "-shell=false", "-wait=1s", "uptime"}
code := c.Run(args) code := c.Run(args)
@ -94,7 +87,8 @@ func TestExecCommandRun_CrossDC(t *testing.T) {
} }
}) })
ui, c := testExecCommand(t) ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a1.HTTPAddr(), "-wait=500ms", "-datacenter=dc2", "uptime"} args := []string{"-http-addr=" + a1.HTTPAddr(), "-wait=500ms", "-datacenter=dc2", "uptime"}
code := c.Run(args) code := c.Run(args)
@ -150,16 +144,16 @@ func TestExecCommand_Sessions(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
client := a.Client() ui := cli.NewMockUi()
_, c := testExecCommand(t) c := New(ui, nil)
c.client = client c.apiclient = a.Client()
id, err := c.createSession() id, err := c.createSession()
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
se, _, err := client.Session().Info(id, nil) se, _, err := a.Client().Session().Info(id, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -173,7 +167,7 @@ func TestExecCommand_Sessions(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
se, _, err = client.Session().Info(id, nil) se, _, err = a.Client().Session().Info(id, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -189,9 +183,9 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
client := a.Client() ui := cli.NewMockUi()
_, c := testExecCommand(t) c := New(ui, nil)
c.client = client c.apiclient = a.Client()
c.conf.foreignDC = true c.conf.foreignDC = true
c.conf.localDC = "dc1" c.conf.localDC = "dc1"
@ -209,7 +203,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
} }
}) })
se, _, err := client.Session().Info(id, nil) se, _, err := a.Client().Session().Info(id, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -223,7 +217,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
se, _, err = client.Session().Info(id, nil) se, _, err = a.Client().Session().Info(id, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -239,9 +233,9 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
client := a.Client() ui := cli.NewMockUi()
_, c := testExecCommand(t) c := New(ui, nil)
c.client = client c.apiclient = a.Client()
id, err := c.createSession() id, err := c.createSession()
if err != nil { if err != nil {
@ -263,7 +257,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil) pair, _, err := a.Client().KV().Get("_rexec/"+id+"/job", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -277,7 +271,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil) pair, _, err = a.Client().KV().Get("_rexec/"+id+"/job", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -294,9 +288,9 @@ func TestExecCommand_StreamResults(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
client := a.Client() ui := cli.NewMockUi()
_, c := testExecCommand(t) c := New(ui, nil)
c.client = client c.apiclient = a.Client()
c.conf.prefix = "_rexec" c.conf.prefix = "_rexec"
id, err := c.createSession() id, err := c.createSession()
@ -315,7 +309,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
prefix := "_rexec/" + id + "/" prefix := "_rexec/" + id + "/"
ok, _, err := client.KV().Acquire(&consulapi.KVPair{ ok, _, err := a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/ack", Key: prefix + "foo/ack",
Session: id, Session: id,
}, nil) }, nil)
@ -335,7 +329,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
t.Fatalf("timeout") t.Fatalf("timeout")
} }
ok, _, err = client.KV().Acquire(&consulapi.KVPair{ ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/exit", Key: prefix + "foo/exit",
Value: []byte("127"), Value: []byte("127"),
Session: id, Session: id,
@ -357,7 +351,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
} }
// Random key, should ignore // Random key, should ignore
ok, _, err = client.KV().Acquire(&consulapi.KVPair{ ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/random", Key: prefix + "foo/random",
Session: id, Session: id,
}, nil) }, nil)
@ -369,7 +363,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
} }
// Output heartbeat // Output heartbeat
ok, _, err = client.KV().Acquire(&consulapi.KVPair{ ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/out/00000", Key: prefix + "foo/out/00000",
Session: id, Session: id,
}, nil) }, nil)
@ -390,7 +384,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
} }
// Output value // Output value
ok, _, err = client.KV().Acquire(&consulapi.KVPair{ ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/out/00001", Key: prefix + "foo/out/00001",
Value: []byte("test"), Value: []byte("test"),
Session: id, Session: id,