// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package command import ( "context" "fmt" "net/http" "net/http/httptest" "net/http/httputil" neturl "net/url" "regexp" "sort" "strings" "sync/atomic" "testing" "time" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/command/agent" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/cli" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" ) func TestJobRestartCommand_Implements(t *testing.T) { ci.Parallel(t) var _ cli.Command = &JobRestartCommand{} } func TestJobRestartCommand_parseAndValidate(t *testing.T) { ci.Parallel(t) testCases := []struct { name string args []string expectedErr string expectedCmd *JobRestartCommand }{ { name: "missing job", args: []string{}, expectedErr: "This command takes one argument", }, { name: "too many args", args: []string{"one", "two", "three"}, expectedErr: "This command takes one argument", }, { name: "tasks and groups", args: []string{ "-task", "my-task-1", "-task", "my-task-2", "-group", "my-group-1", "-group", "my-group-2", "my-job", }, expectedCmd: &JobRestartCommand{ jobID: "my-job", groups: set.From([]string{"my-group-1", "my-group-2"}), tasks: set.From([]string{"my-task-1", "my-task-2"}), batchSize: 1, }, }, { name: "all tasks", args: []string{"-all-tasks", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", allTasks: true, batchSize: 1, }, }, { name: "all tasks conflicts with task", args: []string{"-all-tasks", "-task", "my-task", "-yes", "my-job"}, expectedErr: "The -all-tasks option cannot be used with -task", }, { name: "batch size as number", args: []string{"-batch-size", "10", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 10, }, }, { name: "batch size as percentage", args: []string{"-batch-size", "10%", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 10, batchSizePercent: true, }, }, { name: "batch size not valid", args: []string{"-batch-size", "not-valid", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch size decimal not valid", args: []string{"-batch-size", "1.5", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch size zero", args: []string{"-batch-size", "0", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch size decimal percent not valid", args: []string{"-batch-size", "1.5%", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch size zero percentage", args: []string{"-batch-size", "0%", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch size with multiple numbers and percentages", args: []string{"-batch-size", "15%10%", "my-job"}, expectedErr: "Invalid -batch-size value", }, { name: "batch wait ask", args: []string{"-batch-wait", "ask", "my-job"}, expectedErr: "terminal is not interactive", // Can't test non-interactive. }, { name: "batch wait duration", args: []string{"-batch-wait", "10s", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 1, batchWait: 10 * time.Second, }, }, { name: "batch wait invalid", args: []string{"-batch-wait", "10", "my-job"}, expectedErr: "Invalid -batch-wait value", }, { name: "on error fail", args: []string{"-on-error", "fail", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 1, onError: jobRestartOnErrorFail, }, }, { name: "on error invalid", args: []string{"-on-error", "invalid", "my-job"}, expectedErr: "Invalid -on-error value", }, { name: "no shutdown delay", args: []string{"-no-shutdown-delay", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 1, noShutdownDelay: true, }, }, { name: "reschedule", args: []string{"-reschedule", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 1, reschedule: true, }, }, { name: "reschedule conflicts with task", args: []string{"-reschedule", "-task", "my-task", "-yes", "my-job"}, expectedErr: "The -reschedule option cannot be used with -task", }, { name: "verbose", args: []string{"-verbose", "my-job"}, expectedCmd: &JobRestartCommand{ jobID: "my-job", batchSize: 1, verbose: true, length: fullId, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ui := &cli.ConcurrentUi{Ui: cli.NewMockUi()} meta := Meta{Ui: ui} // Set some default values if not defined in test case. if tc.expectedCmd != nil { tc.expectedCmd.Meta = meta if tc.expectedCmd.length == 0 { tc.expectedCmd.length = shortId } if tc.expectedCmd.groups == nil { tc.expectedCmd.groups = set.New[string](0) } if tc.expectedCmd.tasks == nil { tc.expectedCmd.tasks = set.New[string](0) } if tc.expectedCmd.onError == "" { tc.expectedCmd.onError = jobRestartOnErrorAsk tc.expectedCmd.autoYes = true tc.args = append([]string{"-yes"}, tc.args...) } } cmd := &JobRestartCommand{Meta: meta} code, err := cmd.parseAndValidate(tc.args) if tc.expectedErr != "" { must.NonZero(t, code) must.ErrorContains(t, err, tc.expectedErr) } else { must.NoError(t, err) must.Zero(t, code) must.Eq(t, tc.expectedCmd, cmd, must.Cmp(cmpopts.IgnoreFields(JobRestartCommand{}, "Meta", "Meta.Ui"))) } }) } } func TestJobRestartCommand_Run(t *testing.T) { ci.Parallel(t) // Create a job with multiple tasks, groups, and allocations. prestartTask := api.NewTask("prestart", "mock_driver"). SetConfig("run_for", "100ms"). SetConfig("exit_code", 0). SetLifecycle(&api.TaskLifecycle{ Hook: api.TaskLifecycleHookPrestart, Sidecar: false, }) sidecarTask := api.NewTask("sidecar", "mock_driver"). SetConfig("run_for", "1m"). SetConfig("exit_code", 0). SetLifecycle(&api.TaskLifecycle{ Hook: api.TaskLifecycleHookPoststart, Sidecar: true, }) mainTask := api.NewTask("main", "mock_driver"). SetConfig("run_for", "1m"). SetConfig("exit_code", 0) batchJob := api.NewBatchJob("test_job_batch", "test_job_batch", "global", 1). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("single_task", 3). AddTask(mainTask), ). AddTaskGroup( api.NewTaskGroup("multiple_tasks", 2). AddTask(prestartTask). AddTask(sidecarTask). AddTask(mainTask), ) serviceJob := api.NewServiceJob("test_job_service", "test_job_service", "global", 1). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("single_task", 3). AddTask(mainTask), ). AddTaskGroup( api.NewTaskGroup("multiple_tasks", 2). AddTask(prestartTask). AddTask(sidecarTask). AddTask(mainTask), ) testCases := []struct { name string args []string // Job arg is added automatically. expectedCode int validateFn func(*testing.T, *api.Client, []*api.AllocationListStub, string, string) }{ { name: "restart only running tasks in all groups by default", args: []string{"-batch-size", "100%"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": true, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") must.Len(t, 5, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart specific task in all groups", args: []string{"-batch-size", "100%", "-task", "main"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") must.Len(t, 5, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart multiple tasks in all groups", args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": true, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") must.Len(t, 5, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart all tasks in all groups", args: []string{"-batch-size", "100%", "-all-tasks"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": true, "sidecar": true, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") must.Len(t, 5, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart running tasks in specific group", args: []string{"-batch-size", "100%", "-group", "single_task"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": false, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart specific task that is not running", args: []string{"-batch-size", "100%", "-task", "prestart"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": false, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": false, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") // Check that we have an error message. must.StrContains(t, stderr, "Task not running") }, expectedCode: 1, }, { name: "restart specific task in specific group", args: []string{"-batch-size", "100%", "-task", "main", "-group", "single_task"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": false, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart multiple tasks in specific group", args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar", "-group", "multiple_tasks"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": false, }, "multiple_tasks": { "prestart": false, "sidecar": true, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") must.Len(t, 2, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart all tasks in specific group", args: []string{"-batch-size", "100%", "-all-tasks", "-group", "multiple_tasks"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": false, }, "multiple_tasks": { "prestart": true, "sidecar": true, "main": true, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") must.Len(t, 2, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "restart in batches", args: []string{"-batch-size", "3", "-batch-wait", "3s", "-task", "main"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": true, }, }) // Check that allocations were properly batched. batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") must.Len(t, 2, batches[1]) must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") // Check that we only waited between batches. waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") must.Eq(t, 1, waitMsgCount) // Check that batches waited the expected time. batch1Restart := batches[0][0].TaskStates["main"].LastRestart batch2Restart := batches[1][0].TaskStates["main"].LastRestart diff := batch2Restart.Sub(batch1Restart) must.Between(t, 3*time.Second, diff, 4*time.Second) }, }, { name: "restart in percent batch", args: []string{"-batch-size", "50%", "-batch-wait", "3s", "-task", "main"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": true, }, }) // Check that allocations were properly batched. batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") must.Len(t, 2, batches[1]) must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") // Check that we only waited between batches. waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") must.Eq(t, 1, waitMsgCount) // Check that batches waited the expected time. batch1Restart := batches[0][0].TaskStates["main"].LastRestart batch2Restart := batches[1][0].TaskStates["main"].LastRestart diff := batch2Restart.Sub(batch1Restart) must.Between(t, 3*time.Second, diff, 4*time.Second) }, }, { name: "restart in batch ask with yes", args: []string{"-batch-size", "100%", "-batch-wait", "ask", "-yes", "-group", "single_task"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ "single_task": { "main": true, }, "multiple_tasks": { "prestart": false, "sidecar": false, "main": false, }, }) // Check that allocations restarted in a single batch. batches := getRestartBatches(restarted, []string{"single_task"}, "main") must.Len(t, 3, batches[0]) must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, { name: "reschedule in batches", args: []string{"-reschedule", "-batch-size", "3"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { // Expect all allocations were rescheduled. reschedules := map[string]bool{} for _, alloc := range allocs { reschedules[alloc.ID] = true } waitAllocsRescheduled(t, client, reschedules) // Check that allocations were properly batched. must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") must.StrNotContains(t, stdout, "Waiting") }, }, { name: "reschedule specific group", args: []string{"-reschedule", "-batch-size", "100%", "-group", "single_task"}, validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { // Expect that only allocs for the single_task group were // rescheduled. reschedules := map[string]bool{} for _, alloc := range allocs { if alloc.TaskGroup == "single_task" { reschedules[alloc.ID] = true } } waitAllocsRescheduled(t, client, reschedules) // Check that allocations restarted in a single batch. must.StrContains(t, stdout, "Restarting 1st batch") must.StrNotContains(t, stdout, "restarting the next batch") }, }, } for _, job := range []*api.Job{batchJob, serviceJob} { for _, tc := range testCases { tc := tc t.Run(fmt.Sprintf("%s/%s", *job.Type, tc.name), func(t *testing.T) { // Run each test case in parallel because they are fairly slow. ci.Parallel(t) // Initialize UI and command. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Start client and server and wait for node to be ready. // User separate cluster for each test case so they can run in // parallel without affecting each other. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Register test job and wait for its allocs to be running. resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } // Fetch allocations before the restart so we know which ones are // supposed to be affected in case the test reschedules allocs. allocStubs, _, err = client.Jobs().Allocations(*job.ID, true, nil) must.NoError(t, err) // Prepend server URL and append job ID to the test case command. args := []string{"-address", url, "-yes"} args = append(args, tc.args...) args = append(args, *job.ID) // Run job restart command. code = cmd.Run(args) must.Eq(t, code, tc.expectedCode) // Run test case validation function. if tc.validateFn != nil { tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) } }) } } } func TestJobRestartCommand_Run_system_reschedule(t *testing.T) { ci.Parallel(t) // Create a system job. job := api.NewSystemJob("test_job", "test_job", "global", 100). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("group", 1). AddTask( api.NewTask("task", "mock_driver"). SetConfig("run_for", "1m"). SetConfig("exit_code", 0), ), ) // Start a server and 3 clients. srv, client, url := testServer(t, false, nil) defer srv.Shutdown() srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) testClient(t, "client2", newClientAgentConfigFunc("", "", srvRPCAddr)) testClient(t, "client3", newClientAgentConfigFunc("", "", srvRPCAddr)) waitForNodes(t, client) // Initialize UI and command. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Register test job and wait for its allocs to be running. resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } // Run job restart command. args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *job.ID} code = cmd.Run(args) must.Eq(t, code, 0) reschedules := map[string]bool{} for _, alloc := range allocStubs { reschedules[alloc.ID] = true } waitAllocsRescheduled(t, client, reschedules) // Check that allocations were rescheduled properly. stdout := ui.OutputWriter.String() must.StrContains(t, stdout, "Restarting 3 allocations") for _, alloc := range allocStubs { must.StrContains(t, stdout, fmt.Sprintf(`Rescheduling allocation "%s"`, alloc.ID)) must.StrContains(t, stdout, fmt.Sprintf(`Allocation "%s" replaced by`, alloc.ID)) } } func TestJobRestartCommand_Run_rescheduleNotSupported(t *testing.T) { ci.Parallel(t) // Create a sysbatch job. sysbatchJob := api.NewSysbatchJob("test_sysbatch_job", "test_sysbatch_job", "global", 100). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("group", 1). AddTask( api.NewTask("task", "mock_driver"). SetConfig("run_for", "1m"). SetConfig("exit_code", 0), ), ) // Start a server and a client. srv, client, url := testServer(t, false, nil) defer srv.Shutdown() srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) waitForNodes(t, client) testCases := []struct { name string job *api.Job }{ { name: "sysbatch job", job: sysbatchJob, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Initialize UI and command. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Register test job and wait for its allocs to be running. resp, _, err := client.Jobs().Register(tc.job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) allocStubs, _, err := client.Jobs().Allocations(*tc.job.ID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } // Run job restart command and expect error. args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *tc.job.ID} code = cmd.Run(args) must.Eq(t, code, 1) stderr := ui.ErrorWriter.String() must.StrContains(t, stderr, "not allowed to be rescheduled") }) } } func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) { ci.Parallel(t) ui := cli.NewMockUi() // Start client and server and wait for node to be ready. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Create non-default namespace. _, err := client.Namespaces().Register(&api.Namespace{Name: "prod"}, nil) must.NoError(t, err) // Register job with same name in both namespaces. evalIDs := []string{} jobDefault := testJob("test_job_restart") resp, _, err := client.Jobs().Register(jobDefault, nil) must.NoError(t, err) evalIDs = append(evalIDs, resp.EvalID) jobProd := testJob("test_job_restart") jobProd.Namespace = pointer.Of("prod") resp, _, err = client.Jobs().Register(jobProd, nil) must.NoError(t, err) evalIDs = append(evalIDs, resp.EvalID) jobUniqueProd := testJob("test_job_restart_prod_ns") jobUniqueProd.Namespace = pointer.Of("prod") resp, _, err = client.Jobs().Register(jobUniqueProd, nil) must.NoError(t, err) evalIDs = append(evalIDs, resp.EvalID) // Wait for evals to be processed. for _, evalID := range evalIDs { code := waitForSuccess(ui, client, fullId, t, evalID) must.Eq(t, 0, code) } ui.OutputWriter.Reset() testCases := []struct { name string args []string expectedErr string }{ { name: "prefix match in default namespace", args: []string{"test_job"}, }, { name: "invalid job", args: []string{"not-valid"}, expectedErr: "No job(s) with prefix or ID", }, { name: "prefix matches multiple jobs", args: []string{"-namespace", "prod", "test_job"}, expectedErr: "matched multiple jobs", }, { name: "prefix matches multiple jobs across namespaces", args: []string{"-namespace", "*", "test_job"}, expectedErr: "matched multiple jobs", }, { name: "unique prefix match across namespaces", args: []string{"-namespace", "*", "test_job_restart_prod"}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { defer func() { ui.OutputWriter.Reset() ui.ErrorWriter.Reset() }() cmd := &JobRestartCommand{ Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, } args := append([]string{"-address", url, "-yes"}, tc.args...) code := cmd.Run(args) if tc.expectedErr != "" { must.NonZero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } else { must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) } }) } } func TestJobRestartCommand_noAllocs(t *testing.T) { ci.Parallel(t) ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Start client and server and wait for node to be ready. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Register test job with impossible constraint so it doesn't get allocs. jobID := "test_job_restart_no_allocs" job := testJob(jobID) job.Datacenters = []string{"invalid"} resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Eq(t, 2, code) // Placement is expected to fail so exit code is not 0. ui.OutputWriter.Reset() // Run job restart command and expect it to exit without restarts. code = cmd.Run([]string{ "-address", url, "-yes", jobID, }) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart") } func TestJobRestartCommand_rescheduleFail(t *testing.T) { ci.Parallel(t) ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Start client and server and wait for node to be ready. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Register test job with 3 allocs. jobID := "test_job_restart_reschedule_fail" job := testJob(jobID) job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(3) job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) ui.OutputWriter.Reset() // Wait for allocs to be running. allocs, _, err := client.Jobs().Allocations(jobID, true, nil) must.NoError(t, err) for _, alloc := range allocs { waitForAllocRunning(t, client, alloc.ID) } // Mark node as ineligible to prevent allocs from being replaced. nodeID := srv.Agent.Client().NodeID() client.Nodes().ToggleEligibility(nodeID, false, nil) // Run job restart command and expect it to fail. code = cmd.Run([]string{ "-address", url, "-batch-size", "2", "-reschedule", "-yes", jobID, }) must.One(t, code) must.StrContains(t, ui.ErrorWriter.String(), "No nodes were eligible for evaluation") } func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { ci.Parallel(t) ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} srv, client, _ := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Register test job and update it twice so we end up with three // allocations, one replacing the next one. jobID := "test_job_restart_monitor_replacement" job := testJob(jobID) for i := 1; i <= 3; i++ { job.TaskGroups[0].Tasks[0].Config["run_for"] = fmt.Sprintf("%ds", i) resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) } ui.OutputWriter.Reset() // Prepare the command internals. We want to run a specific function and // target a specific allocation, so we can't run the full command. cmd.client = client cmd.verbose = true cmd.length = fullId // Fetch, sort, and monitor the oldest allocation. allocs, _, err := client.Jobs().Allocations(jobID, true, nil) must.NoError(t, err) sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex }) errCh := make(chan error) go cmd.monitorReplacementAlloc(context.Background(), AllocationListStubWithJob{ AllocationListStub: allocs[0], Job: job, }, errCh) // Make sure the command doesn't get stuck and that we traverse the // follow-up allocations properly. must.Wait(t, wait.InitialSuccess( wait.ErrorFunc(func() error { select { case err := <-errCh: return err default: return fmt.Errorf("waiting for response") } }), wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), )) must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[0].ID, allocs[1].ID)) must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[1].ID, allocs[2].ID)) must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q is %q", allocs[2].ID, api.AllocClientStatusRunning)) } func TestJobRestartCommand_activeDeployment(t *testing.T) { ci.Parallel(t) srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) // Register test job and update it once to trigger a deployment. jobID := "test_job_restart_deployment" job := testJob(jobID) job.Type = pointer.Of(api.JobTypeService) job.Update = &api.UpdateStrategy{ Canary: pointer.Of(1), AutoPromote: pointer.Of(false), } _, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) _, _, err = client.Jobs().Register(job, nil) must.NoError(t, err) // Wait for a deployment to be running. must.Wait(t, wait.InitialSuccess( wait.ErrorFunc(func() error { deployments, _, err := client.Jobs().Deployments(jobID, true, nil) if err != nil { return err } for _, d := range deployments { if d.Status == api.DeploymentStatusRunning { return nil } } return fmt.Errorf("no running deployments") }), wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), )) // Run job restart command and expect it to fail. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} code := cmd.Run([]string{ "-address", url, "-on-error", jobRestartOnErrorFail, "-verbose", jobID, }) must.One(t, code) must.RegexMatch(t, regexp.MustCompile(`Deployment .+ is "running"`), ui.ErrorWriter.String()) } func TestJobRestartCommand_ACL(t *testing.T) { ci.Parallel(t) // Start server with ACL enabled. srv, client, url := testServer(t, true, func(c *agent.Config) { c.ACL.Enabled = true }) defer srv.Shutdown() rootTokenOpts := &api.WriteOptions{ AuthToken: srv.RootToken.SecretID, } // Register test job. jobID := "test_job_restart_acl" job := testJob(jobID) _, _, err := client.Jobs().Register(job, rootTokenOpts) must.NoError(t, err) // Wait for allocs to be running. waitForJobAllocsStatus(t, client, jobID, api.AllocClientStatusRunning, srv.RootToken.SecretID) testCases := []struct { name string jobPrefix bool aclPolicy string expectedErr string }{ { name: "no token", aclPolicy: "", expectedErr: api.PermissionDeniedErrorContent, }, { name: "alloc-lifecycle not enough", aclPolicy: ` namespace "default" { capabilities = ["alloc-lifecycle"] } `, expectedErr: api.PermissionDeniedErrorContent, }, { name: "read-job not enough", aclPolicy: ` namespace "default" { capabilities = ["read-job"] } `, expectedErr: api.PermissionDeniedErrorContent, }, { name: "alloc-lifecycle and read-job allowed", aclPolicy: ` namespace "default" { capabilities = ["alloc-lifecycle", "read-job"] } `, }, { name: "job prefix requires list-jobs", aclPolicy: ` namespace "default" { capabilities = ["alloc-lifecycle", "read-job"] } `, jobPrefix: true, expectedErr: "job not found", }, { name: "job prefix works with list-jobs", aclPolicy: ` namespace "default" { capabilities = ["list-jobs", "alloc-lifecycle", "read-job"] } `, jobPrefix: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} args := []string{ "-address", url, "-yes", } if tc.aclPolicy != "" { // Create ACL token with test case policy. policy := &api.ACLPolicy{ Name: nonAlphaNum.ReplaceAllString(tc.name, "-"), Rules: tc.aclPolicy, } _, err := client.ACLPolicies().Upsert(policy, rootTokenOpts) must.NoError(t, err) token := &api.ACLToken{ Type: "client", Policies: []string{policy.Name}, } token, _, err = client.ACLTokens().Create(token, rootTokenOpts) must.NoError(t, err) // Set token in command args. args = append(args, "-token", token.SecretID) } // Add job ID or job ID prefix to the command. if tc.jobPrefix { args = append(args, jobID[0:3]) } else { args = append(args, jobID) } // Run command. code := cmd.Run(args) if tc.expectedErr == "" { must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) } else { must.One(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } }) } } // TODO(luiz): update once alloc restart supports -no-shutdown-delay. func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { ci.Parallel(t) // Start client and server and wait for node to be ready. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() waitForNodes(t, client) testCases := []struct { name string args []string shutdownDelay bool }{ { name: "job reschedule with shutdown delay by default", args: []string{"-reschedule"}, shutdownDelay: true, }, { name: "job reschedule no shutdown delay", args: []string{"-reschedule", "-no-shutdown-delay"}, shutdownDelay: false, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Register job with 2 allocations and shutdown_delay. shutdownDelay := 3 * time.Second jobID := nonAlphaNum.ReplaceAllString(tc.name, "-") job := testJob(jobID) job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(2) job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay job.TaskGroups[0].Tasks[0].Services = []*api.Service{{ Name: "service", Provider: "nomad", }} resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout:\n%s\n\nstderr:\n%s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) ui.OutputWriter.Reset() // Wait for alloc to be running. allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } // Add address and job ID to the command and run. args := []string{ "-address", url, "-batch-size", "1", "-batch-wait", "0", "-yes", } args = append(args, tc.args...) args = append(args, jobID) code = cmd.Run(args) must.Zero(t, code, must.Sprintf( "stdout:\n%s\n\nstderr:\n%s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) // Wait for all allocs to restart. reschedules := map[string]bool{} for _, alloc := range allocStubs { reschedules[alloc.ID] = true } allocs := waitAllocsRescheduled(t, client, reschedules) // Check that allocs have shutdown delay event. for _, alloc := range allocs { for _, s := range alloc.TaskStates { var killedEv *api.TaskEvent var killingEv *api.TaskEvent for _, ev := range s.Events { if strings.Contains(ev.Type, "Killed") { killedEv = ev } if strings.Contains(ev.Type, "Killing") { killingEv = ev } } diff := killedEv.Time - killingEv.Time if tc.shutdownDelay { must.GreaterEq(t, shutdownDelay, time.Duration(diff)) } else { // Add a bit of slack to account for the actual // shutdown time of the task. must.Between(t, shutdownDelay, time.Duration(diff), shutdownDelay+time.Second) } } } }) } } func TestJobRestartCommand_filterAllocs(t *testing.T) { ci.Parallel(t) task1 := api.NewTask("task_1", "mock_driver") task2 := api.NewTask("task_2", "mock_driver") task3 := api.NewTask("task_3", "mock_driver") jobV1 := api.NewServiceJob("example", "example", "global", 1). AddTaskGroup( api.NewTaskGroup("group_1", 1). AddTask(task1), ). AddTaskGroup( api.NewTaskGroup("group_2", 1). AddTask(task1). AddTask(task2), ). AddTaskGroup( api.NewTaskGroup("group_3", 1). AddTask(task3), ) jobV1.Version = pointer.Of(uint64(1)) jobV2 := api.NewServiceJob("example", "example", "global", 1). AddTaskGroup( api.NewTaskGroup("group_1", 1). AddTask(task1), ). AddTaskGroup( api.NewTaskGroup("group_2", 1). AddTask(task2), ) jobV2.Version = pointer.Of(uint64(2)) allAllocs := []AllocationListStubWithJob{} allocs := map[string]AllocationListStubWithJob{} for _, job := range []*api.Job{jobV1, jobV2} { for _, tg := range job.TaskGroups { for _, desired := range []string{api.AllocDesiredStatusRun, api.AllocDesiredStatusStop} { for _, client := range []string{api.AllocClientStatusRunning, api.AllocClientStatusComplete} { key := fmt.Sprintf("job_v%d_%s_%s_%s", *job.Version, *tg.Name, desired, client) alloc := AllocationListStubWithJob{ AllocationListStub: &api.AllocationListStub{ ID: key, JobVersion: *job.Version, TaskGroup: *tg.Name, DesiredStatus: desired, ClientStatus: client, }, Job: job, } allocs[key] = alloc allAllocs = append(allAllocs, alloc) // Allocations with a replacement must always be skipped. replacedAlloc := AllocationListStubWithJob{ AllocationListStub: &api.AllocationListStub{ ID: key, JobVersion: *job.Version, TaskGroup: *tg.Name, DesiredStatus: desired, ClientStatus: client, NextAllocation: alloc.ID, }, Job: job, } allocs[key+"_replaced"] = replacedAlloc allAllocs = append(allAllocs, replacedAlloc) } } } } testCases := []struct { name string args []string expectedAllocs []AllocationListStubWithJob }{ { name: "skip by group", args: []string{"-group", "group_1"}, expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_1_run_running"], allocs["job_v1_group_1_run_complete"], allocs["job_v1_group_1_stop_running"], allocs["job_v2_group_1_run_running"], allocs["job_v2_group_1_run_complete"], allocs["job_v2_group_1_stop_running"], }, }, { name: "skip by old group", args: []string{"-group", "group_3"}, expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_3_run_running"], allocs["job_v1_group_3_run_complete"], allocs["job_v1_group_3_stop_running"], }, }, { name: "skip by task", args: []string{"-task", "task_2"}, expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_2_run_running"], allocs["job_v1_group_2_run_complete"], allocs["job_v1_group_2_stop_running"], allocs["job_v2_group_2_run_running"], allocs["job_v2_group_2_run_complete"], allocs["job_v2_group_2_stop_running"], }, }, { name: "skip by old task", args: []string{"-task", "task_3"}, expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_3_run_running"], allocs["job_v1_group_3_run_complete"], allocs["job_v1_group_3_stop_running"], }, }, { name: "skip by group and task", args: []string{ "-group", "group_1", "-group", "group_2", "-task", "task_2", }, // Only group_2 has task_2 in all job versions. expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_2_run_running"], allocs["job_v1_group_2_run_complete"], allocs["job_v1_group_2_stop_running"], allocs["job_v2_group_2_run_running"], allocs["job_v2_group_2_run_complete"], allocs["job_v2_group_2_stop_running"], }, }, { name: "skip by status", args: []string{}, expectedAllocs: []AllocationListStubWithJob{ allocs["job_v1_group_1_run_running"], allocs["job_v1_group_1_run_complete"], allocs["job_v1_group_1_stop_running"], allocs["job_v1_group_2_run_running"], allocs["job_v1_group_2_run_complete"], allocs["job_v1_group_2_stop_running"], allocs["job_v1_group_3_run_running"], allocs["job_v1_group_3_run_complete"], allocs["job_v1_group_3_stop_running"], allocs["job_v2_group_1_run_running"], allocs["job_v2_group_1_run_complete"], allocs["job_v2_group_1_stop_running"], allocs["job_v2_group_2_run_running"], allocs["job_v2_group_2_run_complete"], allocs["job_v2_group_2_stop_running"], }, }, { name: "no matches by group", args: []string{"-group", "group_404"}, expectedAllocs: []AllocationListStubWithJob{}, }, { name: "no matches by task", args: []string{"-task", "task_404"}, expectedAllocs: []AllocationListStubWithJob{}, }, { name: "no matches by task with group", args: []string{ "-group", "group_1", "-task", "task_2", // group_1 never has task_2. }, expectedAllocs: []AllocationListStubWithJob{}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ui := cli.NewMockUi() cmd := &JobRestartCommand{ Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, } args := append(tc.args, "-verbose", "-yes", "example") code, err := cmd.parseAndValidate(args) must.NoError(t, err) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) got := cmd.filterAllocs(allAllocs) must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool { return a.ID == b.ID }) expected := set.FromFunc(tc.expectedAllocs, func(a AllocationListStubWithJob) string { return a.ID }) for _, a := range allAllocs { if !expected.Contains(a.ID) { must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("Skipping allocation %q", a.ID)) } } }) } } func TestJobRestartCommand_onErrorFail(t *testing.T) { ci.Parallel(t) ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} // Start client and server and wait for node to be ready. srv, client, url := testServer(t, true, nil) defer srv.Shutdown() parsedURL, err := neturl.Parse(url) must.NoError(t, err) waitForNodes(t, client) // Register a job with 3 allocations. jobID := "test_job_restart_command_fail_on_error" job := testJob(jobID) job.TaskGroups[0].Count = pointer.Of(3) resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) must.Zero(t, code, must.Sprintf( "stdout: %s\n\nstderr: %s\n", ui.OutputWriter.String(), ui.ErrorWriter.String()), ) ui.OutputWriter.Reset() // Create a proxy to inject an error after 2 allocation restarts. // Also counts how many restart requests are made so we can check that the // command stops after the error happens. var allocRestarts int32 proxy := httptest.NewServer(&httputil.ReverseProxy{ ModifyResponse: func(resp *http.Response) error { if strings.HasSuffix(resp.Request.URL.Path, "/restart") { count := atomic.AddInt32(&allocRestarts, 1) if count == 2 { return fmt.Errorf("fail") } } return nil }, Rewrite: func(r *httputil.ProxyRequest) { r.SetURL(parsedURL) }, }) defer proxy.Close() // Run command with -fail-on-error. // Expect only 2 restarts requests even though there are 3 allocations. code = cmd.Run([]string{ "-address", proxy.URL, "-on-error", jobRestartOnErrorFail, jobID, }) must.One(t, code) must.Eq(t, 2, allocRestarts) } // waitTasksRestarted blocks until the given allocations have restarted or not. // Returns a list with updated state of the allocations. // // To determine if a restart happened the function looks for a "Restart // Signaled" event in the list of task events. Allocations that are reused // between tests may contain a restart event from a past test case, leading to // false positives. // // The restarts map contains values structured as group:task:. func waitTasksRestarted( t *testing.T, client *api.Client, allocs []*api.AllocationListStub, restarts map[string]map[string]bool, ) []*api.Allocation { t.Helper() var newAllocs []*api.Allocation testutil.WaitForResult(func() (bool, error) { newAllocs = make([]*api.Allocation, 0, len(allocs)) for _, alloc := range allocs { if _, ok := restarts[alloc.TaskGroup]; !ok { t.Fatalf("Missing group %q in restarts map", alloc.TaskGroup) } // Skip allocations that are not supposed to be running. if alloc.DesiredStatus != api.AllocDesiredStatusRun { continue } updated, _, err := client.Allocations().Info(alloc.ID, nil) if err != nil { return false, err } newAllocs = append(newAllocs, updated) for task, state := range updated.TaskStates { restarted := false for _, ev := range state.Events { if ev.Type == api.TaskRestartSignal { restarted = true break } } if restarted && !restarts[updated.TaskGroup][task] { return false, fmt.Errorf( "task %q in alloc %s for group %q not expected to restart", task, updated.ID, updated.TaskGroup, ) } if !restarted && restarts[updated.TaskGroup][task] { return false, fmt.Errorf( "task %q in alloc %s for group %q expected to restart but didn't", task, updated.ID, updated.TaskGroup, ) } } } return true, nil }, func(err error) { must.NoError(t, err) }) return newAllocs } // waitAllocsRescheduled blocks until the given allocations have been // rescueduled or not. Returns a list with updated state of the allocations. // // To determined if an allocation has been rescheduled the function looks for // a non-empty NextAllocation field. // // The reschedules map maps allocation IDs to a boolean indicating if a // reschedule is expected for that allocation. func waitAllocsRescheduled(t *testing.T, client *api.Client, reschedules map[string]bool) []*api.Allocation { t.Helper() var newAllocs []*api.Allocation testutil.WaitForResult(func() (bool, error) { newAllocs = make([]*api.Allocation, 0, len(reschedules)) for allocID, reschedule := range reschedules { alloc, _, err := client.Allocations().Info(allocID, nil) if err != nil { return false, err } newAllocs = append(newAllocs, alloc) wasRescheduled := alloc.NextAllocation != "" if wasRescheduled && !reschedule { return false, fmt.Errorf("alloc %s not expected to be rescheduled", alloc.ID) } if !wasRescheduled && reschedule { return false, fmt.Errorf("alloc %s expected to be rescheduled but wasn't", alloc.ID) } } return true, nil }, func(err error) { must.NoError(t, err) }) return newAllocs } // getRestartBatches returns a list of allocations per batch of restarts. // // Since restarts are issued concurrently, it's expected that allocations in // the same batch have fairly close LastRestart times, so a 1s delay between // restarts may be enough to indicate a new batch. func getRestartBatches(allocs []*api.Allocation, groups []string, task string) [][]*api.Allocation { groupsSet := set.From(groups) batches := [][]*api.Allocation{} type allocRestart struct { alloc *api.Allocation restart time.Time } restarts := make([]allocRestart, 0, len(allocs)) for _, alloc := range allocs { if !groupsSet.Contains(alloc.TaskGroup) { continue } restarts = append(restarts, allocRestart{ alloc: alloc, restart: alloc.TaskStates[task].LastRestart, }) } sort.Slice(restarts, func(i, j int) bool { return restarts[i].restart.Before(restarts[j].restart) }) prev := restarts[0].restart batch := []*api.Allocation{} for _, r := range restarts { if r.restart.Sub(prev) >= time.Second { prev = r.restart batches = append(batches, batch) batch = []*api.Allocation{} } batch = append(batch, r.alloc) } batches = append(batches, batch) return batches }