Merge pull request #584 from hashicorp/f-exec-dc
Support "exec" in foreign datacenters
This commit is contained in:
commit
b10f9e8d8a
|
@ -4,6 +4,17 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// SessionBehaviorRelease is the default behavior and causes
|
||||
// all associated locks to be released on session invalidation.
|
||||
SessionBehaviorRelease = "release"
|
||||
|
||||
// SessionBehaviorDelete is new in Consul 0.5 and changes the
|
||||
// behavior to delete all associated locks on session invalidation.
|
||||
// It can be used in a way similar to Ephemeral Nodes in ZooKeeper.
|
||||
SessionBehaviorDelete = "delete"
|
||||
)
|
||||
|
||||
// SessionEntry represents a session in consul
|
||||
type SessionEntry struct {
|
||||
CreateIndex uint64
|
||||
|
|
102
command/exec.go
102
command/exec.go
|
@ -42,6 +42,13 @@ const (
|
|||
// rExecQuietWait is how long we wait for no responses
|
||||
// before assuming the job is done.
|
||||
rExecQuietWait = 2 * time.Second
|
||||
|
||||
// rExecTTL is how long we default the session TTL to
|
||||
rExecTTL = "15s"
|
||||
|
||||
// rExecRenewInterval is how often we renew the session TTL
|
||||
// when doing an exec in a foreign DC.
|
||||
rExecRenewInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// rExecConf is used to pass around configuration
|
||||
|
@ -49,6 +56,10 @@ type rExecConf struct {
|
|||
datacenter string
|
||||
prefix string
|
||||
|
||||
foreignDC bool
|
||||
localDC string
|
||||
localNode string
|
||||
|
||||
node string
|
||||
service string
|
||||
tag string
|
||||
|
@ -111,6 +122,7 @@ type ExecCommand struct {
|
|||
conf rExecConf
|
||||
client *consulapi.Client
|
||||
sessionID string
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Run(args []string) int {
|
||||
|
@ -166,13 +178,23 @@ func (c *ExecCommand) Run(args []string) int {
|
|||
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
_, err = client.Agent().NodeName()
|
||||
info, err := client.Agent().Self()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
c.client = client
|
||||
|
||||
// Check if this is a foreign datacenter
|
||||
if c.conf.datacenter != "" && c.conf.datacenter != info["Config"]["Datacenter"] {
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info("Remote exec in foreign datacenter, using Session TTL")
|
||||
}
|
||||
c.conf.foreignDC = true
|
||||
c.conf.localDC = info["Config"]["Datacenter"].(string)
|
||||
c.conf.localNode = info["Config"]["NodeName"].(string)
|
||||
}
|
||||
|
||||
// Create the job spec
|
||||
spec, err := c.makeRExecSpec()
|
||||
if err != nil {
|
||||
|
@ -418,16 +440,92 @@ func (conf *rExecConf) validate() error {
|
|||
|
||||
// createSession is used to create a new session for this command
|
||||
func (c *ExecCommand) createSession() (string, error) {
|
||||
var id string
|
||||
var err error
|
||||
if c.conf.foreignDC {
|
||||
id, err = c.createSessionForeign()
|
||||
} else {
|
||||
id, err = c.createSessionLocal()
|
||||
}
|
||||
if err == nil {
|
||||
c.stopCh = make(chan struct{})
|
||||
go c.renewSession(id, c.stopCh)
|
||||
}
|
||||
return id, err
|
||||
}
|
||||
|
||||
// createSessionLocal is used to create a new session in a local datacenter
|
||||
// This is simpler since we can use the local agent to create the session.
|
||||
func (c *ExecCommand) createSessionLocal() (string, error) {
|
||||
session := c.client.Session()
|
||||
se := consulapi.SessionEntry{
|
||||
Name: "Remote Exec",
|
||||
Name: "Remote Exec",
|
||||
Behavior: consulapi.SessionBehaviorDelete,
|
||||
TTL: rExecTTL,
|
||||
}
|
||||
id, _, err := session.Create(&se, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// createSessionLocal is used to create a new session in a foreign datacenter
|
||||
// This is more complex since the local agent cannot be used to create
|
||||
// a session, and we must associate with a node in the remote datacenter.
|
||||
func (c *ExecCommand) createSessionForeign() (string, error) {
|
||||
// Look for a remote node to bind to
|
||||
health := c.client.Health()
|
||||
services, _, err := health.Service("consul", "", true, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
|
||||
}
|
||||
if len(services) == 0 {
|
||||
return "", fmt.Errorf("Failed to find Consul server in remote datacenter")
|
||||
}
|
||||
node := services[0].Node.Node
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info(fmt.Sprintf("Binding session to remote node %s@%s",
|
||||
node, c.conf.datacenter))
|
||||
}
|
||||
|
||||
session := c.client.Session()
|
||||
se := consulapi.SessionEntry{
|
||||
Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
|
||||
Node: node,
|
||||
Checks: []string{},
|
||||
Behavior: consulapi.SessionBehaviorDelete,
|
||||
TTL: rExecTTL,
|
||||
}
|
||||
id, _, err := session.CreateNoChecks(&se, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// renewSession is a long running routine that periodically renews
|
||||
// the session TTL. This is used for foreign sessions where we depend
|
||||
// on TTLs.
|
||||
func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
|
||||
session := c.client.Session()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(rExecRenewInterval):
|
||||
_, _, err := session.Renew(id, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Session renew failed: %v", err))
|
||||
return
|
||||
}
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// destroySession is used to destroy the associated session
|
||||
func (c *ExecCommand) destroySession() error {
|
||||
// Stop the session renew if any
|
||||
if c.stopCh != nil {
|
||||
close(c.stopCh)
|
||||
c.stopCh = nil
|
||||
}
|
||||
|
||||
// Destroy the session explicitly
|
||||
session := c.client.Session()
|
||||
_, err := session.Destroy(c.sessionID, nil)
|
||||
return err
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/command/agent"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
@ -33,6 +35,43 @@ func TestExecCommandRun(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestExecCommandRun_CrossDC(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
|
||||
a2 := testAgentWithConfig(t, func(c *agent.Config) {
|
||||
c.Datacenter = "dc2"
|
||||
})
|
||||
defer a2.Shutdown()
|
||||
|
||||
// Join over the WAN
|
||||
wanAddr := fmt.Sprintf("%s:%d", a1.config.BindAddr, a1.config.Ports.SerfWan)
|
||||
n, err := a2.agent.JoinWAN([]string{wanAddr})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("bad %d", n)
|
||||
}
|
||||
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
waitForLeader(t, a2.httpAddr)
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{Ui: ui}
|
||||
args := []string{"-http-addr=" + a1.httpAddr,
|
||||
"-wait=400ms", "-datacenter=dc2", "uptime"}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
|
||||
if !strings.Contains(ui.OutputWriter.String(), "load") {
|
||||
t.Fatalf("bad: %#v", ui.OutputWriter.String())
|
||||
}
|
||||
}
|
||||
|
||||
func waitForLeader(t *testing.T, httpAddr string) {
|
||||
client, err := HTTPClient(httpAddr)
|
||||
if err != nil {
|
||||
|
@ -128,6 +167,60 @@ func TestExecCommand_Sessions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestExecCommand_Sessions_Foreign(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
client, err := HTTPClient(a1.httpAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{
|
||||
Ui: ui,
|
||||
client: client,
|
||||
}
|
||||
|
||||
c.conf.foreignDC = true
|
||||
c.conf.localDC = "dc1"
|
||||
c.conf.localNode = "foo"
|
||||
|
||||
var id string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
id, err = c.createSession()
|
||||
if err != nil && strings.Contains(err.Error(), "Failed to find Consul server") {
|
||||
err = nil
|
||||
}
|
||||
return id != "", err
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
se, _, err := client.Session().Info(id, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if se == nil || se.Name != "Remote Exec via foo@dc1" {
|
||||
t.Fatalf("bad: %v", se)
|
||||
}
|
||||
|
||||
c.sessionID = id
|
||||
err = c.destroySession()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
se, _, err = client.Session().Info(id, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if se != nil {
|
||||
t.Fatalf("bad: %v", se)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecCommand_UploadDestroy(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
|
|
Loading…
Reference in New Issue