diff --git a/client/task_runner_test.go b/client/task_runner_test.go index e7e2b19db..e2cb99209 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -536,7 +536,7 @@ func TestTaskRunner_RestartTask(t *testing.T) { task.Driver = "mock_driver" task.Config = map[string]interface{}{ "exit_code": "0", - "run_for": "10s", + "run_for": "100s", } ctx := testTaskRunnerFromAlloc(t, true, alloc) diff --git a/command/job_dispatch.go b/command/job_dispatch.go index c6a334e98..702fa4a97 100644 --- a/command/job_dispatch.go +++ b/command/job_dispatch.go @@ -51,7 +51,7 @@ Dispatch Options: } func (c *JobDispatchCommand) Synopsis() string { - return "Dispatch an instance of a parametereized job" + return "Dispatch an instance of a parameterized job" } func (c *JobDispatchCommand) Run(args []string) int { diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 76eac8917..8cb170045 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -192,6 +192,8 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // The Evaluation GC should not handle batch jobs since those need to be // garbage collected in one shot + // XXX believe there is a bug that if a batch job gets stopped, there is no + // way for it to GC the eval/allocs gc, allocs, err := c.gcEval(eval, oldThreshold, false) if err != nil { return err diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index f6c75753d..72bd4bf66 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -965,6 +965,66 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { } } +// This test ensures parameterized and periodic jobs don't get GCd +func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert a parameterized job. + state := s1.fsm.State() + job := mock.Job() + job.Type = structs.JobTypeBatch + job.Status = structs.JobStatusRunning + job.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: structs.DispatchPayloadRequired, + } + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert a periodic job. + job2 := mock.PeriodicJob() + if err := state.UpsertJob(1001, job2); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should still exist + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + outE, err := state.JobByID(job2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outE == nil { + t.Fatalf("bad: %v", outE) + } +} + func TestCoreScheduler_PartitionReap(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index f14bfd8b1..7ab5495a8 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -70,7 +70,7 @@ func TestResetHeartbeatTimerLocked(t *testing.T) { t.Fatalf("missing timer") } - time.Sleep(10 * time.Millisecond) + time.Sleep(time.Duration(testutil.TestMultiplier()*10) * time.Millisecond) if _, ok := s1.heartbeatTimers["foo"]; ok { t.Fatalf("timer should be gone") @@ -99,7 +99,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) { renew := time.Now() // Watch for invalidation - for time.Now().Sub(renew) < 20*time.Millisecond { + for time.Now().Sub(renew) < time.Duration(testutil.TestMultiplier()*20)*time.Millisecond { s1.heartbeatTimersLock.Lock() _, ok := s1.heartbeatTimers["foo"] s1.heartbeatTimersLock.Unlock() diff --git a/nomad/state/schema.go b/nomad/state/schema.go index aed0e7ab3..f61b1d109 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -149,9 +149,10 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } - // The job is GCable if it is batch and it is not periodic + // The job is GCable if it is batch, it is not periodic and is not a + // parameterized job. periodic := j.Periodic != nil && j.Periodic.Enabled - gcable := j.Type == structs.JobTypeBatch && !periodic + gcable := j.Type == structs.JobTypeBatch && !periodic && !j.IsParameterized() return gcable, nil } diff --git a/nomad/structs/config/consul.go b/nomad/structs/config/consul.go index ee47a793a..19ee82c71 100644 --- a/nomad/structs/config/consul.go +++ b/nomad/structs/config/consul.go @@ -1,7 +1,6 @@ package config import ( - "crypto/tls" "fmt" "net/http" "strings" @@ -121,7 +120,7 @@ func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig { result.EnableSSL = b.EnableSSL } if b.VerifySSL != nil { - result.VerifySSL = b.EnableSSL + result.VerifySSL = b.VerifySSL } if b.CAFile != "" { result.CAFile = b.CAFile @@ -180,6 +179,7 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) { if c.VerifySSL != nil { tlsConfig.InsecureSkipVerify = !*c.VerifySSL } + tlsClientCfg, err := consul.SetupTLSConfig(&tlsConfig) if err != nil { return nil, fmt.Errorf("error creating tls client config for consul: %v", err) @@ -188,13 +188,6 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) { TLSClientConfig: tlsClientCfg, } } - if c.EnableSSL != nil && !*c.VerifySSL { - config.HttpClient.Transport = &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - } return config, nil } diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 28812f0f1..cfa639e71 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -591,9 +591,6 @@ func TestVaultClient_LookupToken_RateLimit(t *testing.T) { unblock := make(chan struct{}) for i := 0; i < numRequests; i++ { go func() { - // Ensure all the goroutines are made - time.Sleep(10 * time.Millisecond) - // Lookup ourselves _, err := client.LookupToken(ctx, v.Config.Token) if err != nil { @@ -607,7 +604,7 @@ func TestVaultClient_LookupToken_RateLimit(t *testing.T) { // Cancel the context cancel() - time.AfterFunc(1*time.Second, func() { close(unblock) }) + close(unblock) }() } @@ -618,9 +615,15 @@ func TestVaultClient_LookupToken_RateLimit(t *testing.T) { } desired := numRequests - 1 - if cancels != desired { - t.Fatalf("Incorrect number of cancels; got %d; want %d", cancels, desired) - } + testutil.WaitForResult(func() (bool, error) { + if cancels != desired { + return false, fmt.Errorf("Incorrect number of cancels; got %d; want %d", cancels, desired) + } + + return true, nil + }, func(err error) { + t.Fatalf("Connection not established") + }) } func TestVaultClient_CreateToken_Root(t *testing.T) { diff --git a/website/source/docs/cluster/requirements.html.md b/website/source/docs/cluster/requirements.html.md index bb8943ab9..8f22143df 100644 --- a/website/source/docs/cluster/requirements.html.md +++ b/website/source/docs/cluster/requirements.html.md @@ -54,3 +54,18 @@ are not participating in Raft. Thus clients can have 100+ millisecond latency to their servers. This allows having a set of Nomad servers that service clients that can be spread geographically over a continent or even the world in the case of having a single "global" region and many datacenter. + +## Ports Used + +Nomad requires 3 different ports to work properly on servers and 2 on clients, +some on TCP, UDP, or both protocols. Below we document the requirements for each +port. + +* HTTP API (Default 4646). This is used by clients and servers to serve the HTTP + API. TCP only. + +* RPC (Default 4647). This is used by servers and clients to communicate amongst + each other. TCP only. + +* Serf WAN (Default 4648). This is used by servers to gossip over the WAN to + other servers. TCP and UDP.