diff --git a/command/exec.go b/command/exec.go index ac42da5af..d1bddd3f4 100644 --- a/command/exec.go +++ b/command/exec.go @@ -42,6 +42,14 @@ const ( // rExecQuietWait is how long we wait for no responses // before assuming the job is done. rExecQuietWait = 2 * time.Second + + // rExecForeignTTL is how long we default the session TTL + // to when doing an exec in a foreign DC. + rExecForeignTTL = "15s" + + // rExecRenewInterval is how often we renew the session TTL + // when doing an exec in a foreign DC. + rExecRenewInterval = 5 * time.Second ) // rExecConf is used to pass around configuration @@ -49,6 +57,10 @@ type rExecConf struct { datacenter string prefix string + foreignDC bool + localDC string + localNode string + node string service string tag string @@ -111,6 +123,7 @@ type ExecCommand struct { conf rExecConf client *consulapi.Client sessionID string + stopCh chan struct{} } func (c *ExecCommand) Run(args []string) int { @@ -166,13 +179,23 @@ func (c *ExecCommand) Run(args []string) int { c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) return 1 } - _, err = client.Agent().NodeName() + info, err := client.Agent().Self() if err != nil { c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) return 1 } c.client = client + // Check if this is a foreign datacenter + if c.conf.datacenter != "" && c.conf.datacenter != info["Config"]["Datacenter"] { + if c.conf.verbose { + c.Ui.Info("Remote exec in foreign datacenter, using Session TTL") + } + c.conf.foreignDC = true + c.conf.localDC = info["Config"]["Datacenter"].(string) + c.conf.localNode = info["Config"]["NodeName"].(string) + } + // Create the job spec spec, err := c.makeRExecSpec() if err != nil { @@ -418,16 +441,88 @@ func (conf *rExecConf) validate() error { // createSession is used to create a new session for this command func (c *ExecCommand) createSession() (string, error) { + if c.conf.foreignDC { + id, err := c.createSessionForeign() + if err == nil { + c.stopCh = make(chan struct{}) + go c.renewSession(id, c.stopCh) + } + return id, err + } + return c.createSessionLocal() +} + +// createSessionLocal is used to create a new session in a local datacenter +// This is simpler since we can use the local agent to create the session. +func (c *ExecCommand) createSessionLocal() (string, error) { session := c.client.Session() se := consulapi.SessionEntry{ - Name: "Remote Exec", + Name: "Remote Exec", + Behavior: consulapi.SessionBehaviorDelete, } id, _, err := session.Create(&se, nil) return id, err } +// createSessionLocal is used to create a new session in a foreign datacenter +// This is more complex since the local agent cannot be used to create +// a session, and we must associate with a node in the remote datacenter. +func (c *ExecCommand) createSessionForeign() (string, error) { + // Look for a remote node to bind to + health := c.client.Health() + services, _, err := health.Service("consul", "", true, nil) + if err != nil { + return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err) + } + if len(services) == 0 { + return "", fmt.Errorf("Failed to find Consul server in remote datacenter") + } + node := services[0].Node.Node + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Binding session to remote node %s@%s", + node, c.conf.datacenter)) + } + + session := c.client.Session() + se := consulapi.SessionEntry{ + Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC), + Node: node, + Checks: []string{}, + Behavior: consulapi.SessionBehaviorDelete, + TTL: rExecForeignTTL, + } + id, _, err := session.CreateNoChecks(&se, nil) + return id, err +} + +// renewSession is a long running routine that periodically renews +// the session TTL. This is used for foreign sessions where we depend +// on TTLs. +func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) { + session := c.client.Session() + for { + select { + case <-time.After(rExecRenewInterval): + _, _, err := session.Renew(id, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Session renew failed: %v", err)) + return + } + case <-stopCh: + return + } + } +} + // destroySession is used to destroy the associated session func (c *ExecCommand) destroySession() error { + // Stop the session renew if any + if c.stopCh != nil { + close(c.stopCh) + c.stopCh = nil + } + + // Destroy the session explicitly session := c.client.Session() _, err := session.Destroy(c.sessionID, nil) return err diff --git a/command/exec_test.go b/command/exec_test.go index c23f9afa2..0e259e536 100644 --- a/command/exec_test.go +++ b/command/exec_test.go @@ -1,11 +1,13 @@ package command import ( + "fmt" "strings" "testing" "time" consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/agent" "github.com/hashicorp/consul/testutil" "github.com/mitchellh/cli" ) @@ -33,6 +35,43 @@ func TestExecCommandRun(t *testing.T) { } } +func TestExecCommandRun_CrossDC(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + a2 := testAgentWithConfig(t, func(c *agent.Config) { + c.Datacenter = "dc2" + }) + defer a2.Shutdown() + + // Join over the WAN + wanAddr := fmt.Sprintf("%s:%d", a1.config.BindAddr, a1.config.Ports.SerfWan) + n, err := a2.agent.JoinWAN([]string{wanAddr}) + if err != nil { + t.Fatalf("err: %v", err) + } + if n != 1 { + t.Fatalf("bad %d", n) + } + + waitForLeader(t, a1.httpAddr) + waitForLeader(t, a2.httpAddr) + + ui := new(cli.MockUi) + c := &ExecCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, + "-wait=400ms", "-datacenter=dc2", "uptime"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), "load") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} + func waitForLeader(t *testing.T, httpAddr string) { client, err := HTTPClient(httpAddr) if err != nil { @@ -128,6 +167,60 @@ func TestExecCommand_Sessions(t *testing.T) { } } +func TestExecCommand_Sessions_Foreign(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + + c.conf.foreignDC = true + c.conf.localDC = "dc1" + c.conf.localNode = "foo" + + var id string + testutil.WaitForResult(func() (bool, error) { + id, err = c.createSession() + if err != nil && strings.Contains(err.Error(), "Failed to find Consul server") { + err = nil + } + return id != "", err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + se, _, err := client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se == nil || se.Name != "Remote Exec via foo@dc1" { + t.Fatalf("bad: %v", se) + } + + c.sessionID = id + err = c.destroySession() + if err != nil { + t.Fatalf("err: %v", err) + } + + se, _, err = client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se != nil { + t.Fatalf("bad: %v", se) + } +} + func TestExecCommand_UploadDestroy(t *testing.T) { a1 := testAgent(t) defer a1.Shutdown()