Properly interpolate services on updated tasks
Previously was interpolating the original task's services again. Fixes #2180 Also fixes a slight memory leak in the new consul agent. Script check handles weren't being deleted after cancellation.
This commit is contained in:
parent
3027335099
commit
cafefa049b
|
@ -1442,7 +1442,7 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, ol
|
||||||
// Allow set the script executor if the driver supports it
|
// Allow set the script executor if the driver supports it
|
||||||
exec = h
|
exec = h
|
||||||
}
|
}
|
||||||
interpolateServices(r.getTaskEnv(), r.task)
|
interpolateServices(r.getTaskEnv(), new)
|
||||||
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
|
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -221,6 +221,7 @@ func (c *ServiceClient) merge(ops *operations) {
|
||||||
if script, ok := c.runningScripts[cid]; ok {
|
if script, ok := c.runningScripts[cid]; ok {
|
||||||
script.cancel()
|
script.cancel()
|
||||||
delete(c.scripts, cid)
|
delete(c.scripts, cid)
|
||||||
|
delete(c.runningScripts, cid)
|
||||||
}
|
}
|
||||||
delete(c.checks, cid)
|
delete(c.checks, cid)
|
||||||
}
|
}
|
||||||
|
@ -673,14 +674,15 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
|
||||||
|
|
||||||
switch check.Type {
|
switch check.Type {
|
||||||
case structs.ServiceCheckHTTP:
|
case structs.ServiceCheckHTTP:
|
||||||
if check.Protocol == "" {
|
proto := check.Protocol
|
||||||
check.Protocol = "http"
|
if proto == "" {
|
||||||
|
proto = "http"
|
||||||
}
|
}
|
||||||
if check.TLSSkipVerify {
|
if check.TLSSkipVerify {
|
||||||
chkReg.TLSSkipVerify = true
|
chkReg.TLSSkipVerify = true
|
||||||
}
|
}
|
||||||
base := url.URL{
|
base := url.URL{
|
||||||
Scheme: check.Protocol,
|
Scheme: proto,
|
||||||
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
||||||
}
|
}
|
||||||
relative, err := url.Parse(check.Path)
|
relative, err := url.Parse(check.Path)
|
||||||
|
|
|
@ -767,3 +767,90 @@ func TestConsul_NoTLSSkipVerifySupport(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestConsul_RemoveScript assert removing a script check removes all objects
|
||||||
|
// related to that check.
|
||||||
|
func TestConsul_CancelScript(t *testing.T) {
|
||||||
|
ctx := setupFake()
|
||||||
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||||
|
{
|
||||||
|
Name: "scriptcheckDel",
|
||||||
|
Type: "script",
|
||||||
|
Interval: 9000 * time.Hour,
|
||||||
|
Timeout: 9000 * time.Hour,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "scriptcheckKeep",
|
||||||
|
Type: "script",
|
||||||
|
Interval: 9000 * time.Hour,
|
||||||
|
Timeout: 9000 * time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx); err != nil {
|
||||||
|
t.Fatalf("unexpected error registering task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ctx.syncOnce(); err != nil {
|
||||||
|
t.Fatalf("unexpected error syncing task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.FakeConsul.checks) != 2 {
|
||||||
|
t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 {
|
||||||
|
t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d",
|
||||||
|
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case <-ctx.execs:
|
||||||
|
// Script ran as expected!
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatalf("timed out waiting for script check to run")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove a check and update the task
|
||||||
|
origTask := ctx.Task.Copy()
|
||||||
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
||||||
|
{
|
||||||
|
Name: "scriptcheckKeep",
|
||||||
|
Type: "script",
|
||||||
|
Interval: 9000 * time.Hour,
|
||||||
|
Timeout: 9000 * time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ctx.ServiceClient.UpdateTask("allocid", origTask, ctx.Task, ctx); err != nil {
|
||||||
|
t.Fatalf("unexpected error registering task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ctx.syncOnce(); err != nil {
|
||||||
|
t.Fatalf("unexpected error syncing task: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.FakeConsul.checks) != 1 {
|
||||||
|
t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 {
|
||||||
|
t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d",
|
||||||
|
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure exec wasn't called again
|
||||||
|
select {
|
||||||
|
case <-ctx.execs:
|
||||||
|
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
// No unexpected script execs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't leak goroutines
|
||||||
|
for _, scriptHandle := range ctx.ServiceClient.runningScripts {
|
||||||
|
scriptHandle.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2088,7 +2088,6 @@ func (tg *TaskGroup) GoString() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// TODO add Consul TTL check
|
|
||||||
ServiceCheckHTTP = "http"
|
ServiceCheckHTTP = "http"
|
||||||
ServiceCheckTCP = "tcp"
|
ServiceCheckTCP = "tcp"
|
||||||
ServiceCheckScript = "script"
|
ServiceCheckScript = "script"
|
||||||
|
|
Loading…
Reference in a new issue