Merge pull request #3484 from hashicorp/b-nomad-0.7.1

merge nomad 0.7.1 branch
This commit is contained in:
Preetha 2017-11-01 16:50:37 -05:00 committed by GitHub
commit 2f67e839c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 843 additions and 244 deletions

View file

@ -1,3 +1,14 @@
## 0.7.1 (Unreleased)
IMPROVEMENTS:
* api: Allocations now track and return modify time in addition to create time.
* cli: Allocation create and modify times are displayed in a human readable relative format like `6 h ago`.
* core: Allow agents to be run in `rpc_upgrade_mode` when migrating a cluster
to TLS rather than changing `heartbeat_grace`.
BUG FIXES:
## 0.7.0 (November 1, 2017)
__BACKWARDS INCOMPATIBILITIES:__

View file

@ -95,6 +95,7 @@ type Allocation struct {
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
}
// AllocationMetric is used to deserialize allocation metrics.
@ -132,11 +133,12 @@ type AllocationListStub struct {
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
ModifyTime int64
}
// AllocDeploymentStatus captures the status of the allocation as part of the
// deployment. This can include things like if the allocation has been marked as
// heatlhy.
// healthy.
type AllocDeploymentStatus struct {
Healthy *bool
ModifyIndex uint64

View file

@ -76,8 +76,13 @@ type AllocRunner struct {
// to call.
prevAlloc prevAllocWatcher
// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
ctx context.Context
exitFn context.CancelFunc
// waitCh is closed when the Run method exits. At that point the alloc
// has stopped and been GC'd.
waitCh chan struct{}
// State related fields
@ -917,11 +922,6 @@ func (r *AllocRunner) handleDestroy() {
// state as we wait for a destroy.
alloc := r.Alloc()
//TODO(schmichael) updater can cause a GC which can block on this alloc
// runner shutting down. Since handleDestroy can be called by Run() we
// can't block shutdown here as it would cause a deadlock.
go r.updater(alloc)
// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
if err := r.saveAllocRunnerState(); err != nil {
@ -935,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}
// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)
for {
select {
case <-r.ctx.Done():
@ -1065,6 +1070,17 @@ func (r *AllocRunner) Destroy() {
r.allocBroadcast.Close()
}
// IsDestroyed returns true if the AllocRunner is not running and has been
// destroyed (GC'd).
func (r *AllocRunner) IsDestroyed() bool {
select {
case <-r.waitCh:
return true
default:
return false
}
}
// WaitCh returns a channel to wait for termination
func (r *AllocRunner) WaitCh() <-chan struct{} {
return r.waitCh

View file

@ -124,7 +124,8 @@ type Client struct {
// successfully
serversDiscoveredCh chan struct{}
// allocs is the current set of allocations
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]*AllocRunner
allocLock sync.RWMutex
@ -486,15 +487,16 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}
// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
return c.garbageCollector.Collect(allocID)
}
// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}
// Node returns the locally registered node
@ -721,11 +723,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}
// NumAllocs returns the number of allocs this client has. Used to
// NumAllocs returns the number of un-GC'd allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
n := 0
c.allocLock.RLock()
n := len(c.allocs)
for _, a := range c.allocs {
if !a.IsDestroyed() {
n++
}
}
c.allocLock.RUnlock()
return n
}
@ -1205,6 +1212,7 @@ func (c *Client) updateNodeStatus() error {
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
@ -1234,9 +1242,19 @@ func (c *Client) updateNodeStatus() error {
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
if alloc.Terminated() {
// Terminated, mark for GC
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
// Terminated, mark for GC if we're still tracking this alloc
// runner. If it's not being tracked that means the server has
// already GC'd it (see removeAlloc).
c.allocLock.RLock()
ar, ok := c.allocs[alloc.ID]
c.allocLock.RUnlock()
if ok {
c.garbageCollector.MarkForCollection(ar)
// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}
@ -1531,9 +1549,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
c.removeAlloc(remove)
}
// Update the existing allocations
@ -1544,6 +1560,11 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}
// Make room for new allocations before running
if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil {
c.logger.Printf("[ERR] client: error making room for new allocations: %v", err)
}
// Start the new allocations
for _, add := range diff.added {
migrateToken := update.migrateTokens[add.ID]
@ -1552,26 +1573,33 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}
// Trigger the GC once more now that new allocs are started that could
// have caused thesholds to be exceeded
c.garbageCollector.Trigger()
}
// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(alloc *structs.Allocation) {
c.allocLock.Lock()
ar, ok := c.allocs[alloc.ID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
return
}
// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()
// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(ar)
go c.garbageCollector.Collect(alloc.ID)
return nil
// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(alloc.ID)
}
// updateAlloc is invoked when we should update an allocation
@ -1592,9 +1620,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
defer c.allocLock.Unlock()
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID)
c.allocLock.Unlock()
return nil
}
@ -1618,14 +1646,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}
// Must release allocLock as GC acquires it to count allocs
c.allocLock.Unlock()
// Make room for the allocation before running it
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}
go ar.Run()
return nil
}

View file

@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}
// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
for i := 10; i >= 0; i-- {
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.JobID = job.ID
alloc2.Job = job
// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) {
})
// Delete one allocation
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
t.Fatalf("err: %v", err)
}
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}
// One allocations should get de-registered
// One allocation should get GC'd and removed
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)

View file

@ -28,21 +28,36 @@ type GCConfig struct {
ParallelDestroys int
}
// AllocCounter is used by AllocGarbageCollector to discover how many
// allocations a node has and is generally fulfilled by the Client.
// AllocCounter is used by AllocGarbageCollector to discover how many un-GC'd
// allocations a client has and is generally fulfilled by the Client.
type AllocCounter interface {
NumAllocs() int
}
// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
config *GCConfig
// allocRunners marked for GC
allocRunners *IndexedGCAllocPQ
// statsCollector for node based thresholds (eg disk)
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
destroyCh chan struct{}
shutdownCh chan struct{}
// allocCounter return the number of un-GC'd allocs on this node
allocCounter AllocCounter
// destroyCh is a semaphore for rate limiting concurrent garbage
// collections
destroyCh chan struct{}
// shutdownCh is closed when the GC's run method should exit
shutdownCh chan struct{}
// triggerCh is ticked by the Trigger method to cause a GC
triggerCh chan struct{}
logger *log.Logger
}
// NewAllocGarbageCollector returns a garbage collector for terminated
@ -51,7 +66,7 @@ type AllocGarbageCollector struct {
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
config.ParallelDestroys = 1
}
@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
}
return gc
@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval)
for {
select {
case <-a.triggerCh:
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err)
}
}
}
// Force the garbage collector to run.
func (a *AllocGarbageCollector) Trigger() {
select {
case a.triggerCh <- struct{}{}:
default:
// already triggered
}
}
@ -95,25 +123,16 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
}
// Check if we have enough free space
err := a.statsCollector.Collect()
if err != nil {
if err := a.statsCollector.Collect(); err != nil {
return err
}
// See if we are below thresholds for used disk space and inode usage
// TODO(diptanu) figure out why this is nil
stats := a.statsCollector.Stats()
if stats == nil {
break
}
diskStats := stats.AllocDirStats
if diskStats == nil {
break
}
diskStats := a.statsCollector.Stats().AllocDirStats
reason := ""
liveAllocs := a.allocCounter.NumAllocs()
switch {
case diskStats.UsedPercent > a.config.DiskUsageThreshold:
reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f",
@ -121,19 +140,19 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold:
reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f",
diskStats.InodesUsedPercent, a.config.InodeUsageThreshold)
case a.numAllocs() > a.config.MaxAllocs:
reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs)
case liveAllocs > a.config.MaxAllocs:
reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs)
}
// No reason to gc, exit
if reason == "" {
// No reason to gc, exit
break
}
// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason)
break
}
@ -151,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason)
// Acquire the destroy lock
select {
@ -167,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
case <-a.shutdownCh:
}
a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID)
a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID)
// Release the lock
<-a.destroyCh
@ -177,41 +196,47 @@ func (a *AllocGarbageCollector) Stop() {
close(a.shutdownCh)
}
// Collect garbage collects a single allocation on a node
func (a *AllocGarbageCollector) Collect(allocID string) error {
gcAlloc, err := a.allocRunners.Remove(allocID)
if err != nil {
return fmt.Errorf("unable to collect allocation %q: %v", allocID, err)
// Collect garbage collects a single allocation on a node. Returns true if
// alloc was found and garbage collected; otherwise false.
func (a *AllocGarbageCollector) Collect(allocID string) bool {
if gcAlloc := a.allocRunners.Remove(allocID); gcAlloc != nil {
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
return true
}
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
return nil
a.logger.Printf("[DEBUG] client.gc: alloc %s is invalid or was already garbage collected", allocID)
return false
}
// CollectAll garbage collects all termianated allocations on a node
func (a *AllocGarbageCollector) CollectAll() error {
func (a *AllocGarbageCollector) CollectAll() {
for {
select {
case <-a.shutdownCh:
return nil
return
default:
}
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
return
}
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection")
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full node collection")
}
return nil
}
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
if len(allocations) == 0 {
// Nothing to make room for!
return nil
}
// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.numAllocs() > max {
for a.allocCounter.NumAllocs() > max {
select {
case <-a.shutdownCh:
return nil
@ -227,8 +252,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations")
a.destroyAllocRunner(gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs))
}
totalResource := &structs.Resources{}
for _, alloc := range allocations {
if err := totalResource.Add(alloc.Resources); err != nil {
@ -303,26 +329,9 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) {
return
}
a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
a.allocRunners.Push(ar)
}
// Remove removes an alloc runner without garbage collecting it
func (a *AllocGarbageCollector) Remove(ar *AllocRunner) {
if ar == nil || ar.Alloc() == nil {
return
if a.allocRunners.Push(ar) {
a.logger.Printf("[INFO] client.gc: marking allocation %v for GC", ar.Alloc().ID)
}
alloc := ar.Alloc()
if _, err := a.allocRunners.Remove(alloc.ID); err == nil {
a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID)
}
}
// numAllocs returns the total number of allocs tracked by the client as well
// as those marked for GC.
func (a *AllocGarbageCollector) numAllocs() int {
return a.allocRunners.Length() + a.allocCounter.NumAllocs()
}
// GCAlloc wraps an allocation runner and an index enabling it to be used within
@ -381,15 +390,16 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
}
}
// Push an alloc runner into the GC queue
func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) {
// Push an alloc runner into the GC queue. Returns true if alloc was added,
// false if the alloc already existed.
func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) bool {
i.pqLock.Lock()
defer i.pqLock.Unlock()
alloc := ar.Alloc()
if _, ok := i.index[alloc.ID]; ok {
// No work to do
return
return false
}
gcAlloc := &GCAlloc{
timeStamp: time.Now(),
@ -397,7 +407,7 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) {
}
i.index[alloc.ID] = gcAlloc
heap.Push(&i.heap, gcAlloc)
return
return true
}
func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
@ -413,17 +423,18 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
return gcAlloc
}
func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
// Remove alloc from GC. Returns nil if alloc doesn't exist.
func (i *IndexedGCAllocPQ) Remove(allocID string) *GCAlloc {
i.pqLock.Lock()
defer i.pqLock.Unlock()
if gcAlloc, ok := i.index[allocID]; ok {
heap.Remove(&i.heap, gcAlloc.index)
delete(i.index, allocID)
return gcAlloc, nil
return gcAlloc
}
return nil, fmt.Errorf("alloc %q not present", allocID)
return nil
}
func (i *IndexedGCAllocPQ) Length() int {

View file

@ -1,12 +1,15 @@
package client
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
func gcConfig() *GCConfig {
@ -128,9 +131,7 @@ func TestAllocGarbageCollector_Collect(t *testing.T) {
close(ar1.waitCh)
close(ar2.waitCh)
if err := gc.Collect(ar1.Alloc().ID); err != nil {
t.Fatalf("err: %v", err)
}
gc.Collect(ar1.Alloc().ID)
gcAlloc := gc.allocRunners.Pop()
if gcAlloc == nil || gcAlloc.allocRunner != ar2 {
t.Fatalf("bad gcAlloc: %v", gcAlloc)
@ -147,9 +148,7 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) {
gc.MarkForCollection(ar1)
gc.MarkForCollection(ar2)
if err := gc.CollectAll(); err != nil {
t.Fatalf("err: %v", err)
}
gc.CollectAll()
gcAlloc := gc.allocRunners.Pop()
if gcAlloc != nil {
t.Fatalf("bad gcAlloc: %v", gcAlloc)
@ -290,40 +289,132 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T)
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
// TestAllocGarbageCollector_MaxAllocs asserts that when making room for new
// allocs, terminal allocs are GC'd until old_allocs + new_allocs <= limit
func TestAllocGarbageCollector_MaxAllocs(t *testing.T) {
t.Parallel()
const (
liveAllocs = 3
maxAllocs = 6
gcAllocs = 4
gcAllocsLeft = 1
)
server, serverAddr := testServer(t, nil)
defer server.Shutdown()
testutil.WaitForLeader(t, server.RPC)
logger := testLogger()
statsCollector := &MockStatsCollector{
availableValues: []uint64{10 * 1024 * MB},
usedPercents: []float64{0},
inodePercents: []float64{0},
}
allocCounter := &MockAllocCounter{allocs: liveAllocs}
conf := gcConfig()
conf.MaxAllocs = maxAllocs
gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf)
const maxAllocs = 6
client := testClient(t, func(c *config.Config) {
c.GCMaxAllocs = maxAllocs
c.GCDiskUsageThreshold = 100
c.GCInodeUsageThreshold = 100
c.GCParallelDestroys = 1
c.GCInterval = time.Hour
for i := 0; i < gcAllocs; i++ {
_, ar := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar.waitCh)
gc.MarkForCollection(ar)
c.RPCHandler = server
c.Servers = []string{serverAddr}
c.ConsulConfig.ClientAutoJoin = new(bool) // squelch logs
})
defer client.Shutdown()
waitTilNodeReady(client, t)
callN := 0
assertAllocs := func(expectedAll, expectedDestroyed int) {
// Wait for allocs to be started
callN++
client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed)
testutil.WaitForResult(func() (bool, error) {
all, destroyed := 0, 0
for _, ar := range client.getAllocRunners() {
all++
if ar.IsDestroyed() {
destroyed++
}
}
return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf(
"expected %d allocs (found %d); expected %d destroy (found %d)",
expectedAll, all, expectedDestroyed, destroyed,
)
}, func(err error) {
client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
t.Fatalf("%d alloc state: %v", callN, err)
})
client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
}
if err := gc.MakeRoomFor([]*structs.Allocation{mock.Alloc(), mock.Alloc()}); err != nil {
t.Fatalf("error making room for 2 new allocs: %v", err)
// Create a job
state := server.State()
job := mock.Job()
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config["run_for"] = "30s"
nodeID := client.Node().ID
if err := state.UpsertJob(98, job); err != nil {
t.Fatalf("error upserting job: %v", err)
}
if err := state.UpsertJobSummary(99, mock.JobSummary(job.ID)); err != nil {
t.Fatalf("error upserting job summary: %v", err)
}
// There should be gcAllocsLeft alloc runners left to be collected
if n := len(gc.allocRunners.index); n != gcAllocsLeft {
t.Fatalf("expected %d remaining GC-able alloc runners but found %d", gcAllocsLeft, n)
newAlloc := func() *structs.Allocation {
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.Job = job
alloc.NodeID = nodeID
return alloc
}
// Create the allocations
allocs := make([]*structs.Allocation, 7)
for i := 0; i < len(allocs); i++ {
allocs[i] = newAlloc()
}
// Upsert a copy of the allocs as modifying the originals later would
// cause a race
{
allocsCopy := make([]*structs.Allocation, len(allocs))
for i, a := range allocs {
allocsCopy[i] = a.Copy()
}
if err := state.UpsertAllocs(100, allocsCopy); err != nil {
t.Fatalf("error upserting initial allocs: %v", err)
}
}
// 7 total, 0 GC'd
assertAllocs(7, 0)
// Set the first few as terminal so they're marked for gc
const terminalN = 4
for i := 0; i < terminalN; i++ {
// Copy the alloc so the pointers aren't shared
alloc := allocs[i].Copy()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
allocs[i] = alloc
}
if err := state.UpsertAllocs(101, allocs[:terminalN]); err != nil {
t.Fatalf("error upserting stopped allocs: %v", err)
}
// 7 total, 1 GC'd to get down to limit of 6
assertAllocs(7, 1)
// Add one more alloc
if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil {
t.Fatalf("error upserting new alloc: %v", err)
}
// 8 total, 1 GC'd to get down to limit of 6
// If this fails it may be due to the gc's Run and MarkRoomFor methods
// gc'ing concurrently. May have to disable gc's run loop if this test
// is flaky.
assertAllocs(8, 2)
// Add new allocs to cause the gc of old terminal ones
newAllocs := make([]*structs.Allocation, 4)
for i := 0; i < len(newAllocs); i++ {
newAllocs[i] = newAlloc()
}
if err := state.UpsertAllocs(200, newAllocs); err != nil {
t.Fatalf("error upserting %d new allocs: %v", len(newAllocs), err)
}
// 12 total, 4 GC'd total because all other allocs are alive
assertAllocs(12, 4)
}
func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
@ -391,39 +482,3 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_MaxAllocsThreshold(t *testing.T) {
t.Parallel()
const (
liveAllocs = 3
maxAllocs = 6
gcAllocs = 4
gcAllocsLeft = 1
)
logger := testLogger()
statsCollector := &MockStatsCollector{
availableValues: []uint64{1000},
usedPercents: []float64{0},
inodePercents: []float64{0},
}
allocCounter := &MockAllocCounter{allocs: liveAllocs}
conf := gcConfig()
conf.MaxAllocs = 4
gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf)
for i := 0; i < gcAllocs; i++ {
_, ar := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar.waitCh)
gc.MarkForCollection(ar)
}
if err := gc.keepUsageBelowThreshold(); err != nil {
t.Fatalf("error gc'ing: %v", err)
}
// We should have gc'd down to MaxAllocs
if n := len(gc.allocRunners.index); n != gcAllocsLeft {
t.Fatalf("expected remaining gc allocs (%d) to equal %d", n, gcAllocsLeft)
}
}

View file

@ -190,9 +190,6 @@ func (h *HostStatsCollector) Stats() *HostStats {
// toDiskStats merges UsageStat and PartitionStat to create a DiskStat
func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats {
if usage == nil {
return nil
}
ds := DiskStats{
Size: usage.Total,
Used: usage.Used,

View file

@ -115,7 +115,8 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request
return nil, structs.ErrPermissionDenied
}
return nil, s.agent.Client().CollectAllAllocs()
s.agent.Client().CollectAllAllocs()
return nil, nil
}
func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -131,7 +132,12 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) {
return nil, structs.ErrPermissionDenied
}
return nil, s.agent.Client().CollectAllocation(allocID)
if !s.agent.Client().CollectAllocation(allocID) {
// Could not find alloc
return nil, fmt.Errorf("unable to collect allocation: not present")
}
return nil, nil
}
func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View file

@ -152,6 +152,7 @@ tls {
ca_file = "foo"
cert_file = "bar"
key_file = "pipe"
rpc_upgrade_mode = true
verify_https_client = true
}
sentinel {

View file

@ -771,6 +771,7 @@ func parseTLSConfig(result **config.TLSConfig, list *ast.ObjectList) error {
"http",
"rpc",
"verify_server_hostname",
"rpc_upgrade_mode",
"ca_file",
"cert_file",
"key_file",

View file

@ -172,6 +172,7 @@ func TestConfig_Parse(t *testing.T) {
CAFile: "foo",
CertFile: "bar",
KeyFile: "pipe",
RPCUpgradeMode: true,
VerifyHTTPSClient: true,
},
HTTPAPIResponseHeaders: map[string]string{

View file

@ -214,6 +214,16 @@ func (c *AllocStatusCommand) Run(args []string) int {
}
func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength int, verbose bool) (string, error) {
var formattedCreateTime, formattedModifyTime string
if verbose {
formattedCreateTime = formatUnixNanoTime(alloc.CreateTime)
formattedModifyTime = formatUnixNanoTime(alloc.ModifyTime)
} else {
formattedCreateTime = prettyTimeDiff(time.Unix(0, alloc.CreateTime), time.Now())
formattedModifyTime = prettyTimeDiff(time.Unix(0, alloc.ModifyTime), time.Now())
}
basic := []string{
fmt.Sprintf("ID|%s", limit(alloc.ID, uuidLength)),
fmt.Sprintf("Eval ID|%s", limit(alloc.EvalID, uuidLength)),
@ -225,7 +235,8 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
fmt.Sprintf("Client Description|%s", alloc.ClientDescription),
fmt.Sprintf("Desired Status|%s", alloc.DesiredStatus),
fmt.Sprintf("Desired Description|%s", alloc.DesiredDescription),
fmt.Sprintf("Created At|%s", formatUnixNanoTime(alloc.CreateTime)),
fmt.Sprintf("Created|%s", formattedCreateTime),
fmt.Sprintf("Modified|%s", formattedModifyTime),
}
if alloc.DeploymentID != "" {

View file

@ -128,9 +128,14 @@ func TestAllocStatusCommand_Run(t *testing.T) {
t.Fatalf("expected exit 0, got: %d", code)
}
out := ui.OutputWriter.String()
if !strings.Contains(out, "Created At") {
t.Fatalf("expected to have 'Created At' but saw: %s", out)
if !strings.Contains(out, "Created") {
t.Fatalf("expected to have 'Created' but saw: %s", out)
}
if !strings.Contains(out, "Modified") {
t.Fatalf("expected to have 'Modified' but saw: %s", out)
}
ui.OutputWriter.Reset()
if code := cmd.Run([]string{"-address=" + url, "-verbose", allocId1}); code != 0 {
@ -140,8 +145,8 @@ func TestAllocStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, allocId1) {
t.Fatal("expected to find alloc id in output")
}
if !strings.Contains(out, "Created At") {
t.Fatalf("expected to have 'Created At' but saw: %s", out)
if !strings.Contains(out, "Created") {
t.Fatalf("expected to have 'Created' but saw: %s", out)
}
ui.OutputWriter.Reset()
@ -150,8 +155,8 @@ func TestAllocStatusCommand_Run(t *testing.T) {
t.Fatalf("expected exit 0, got: %d", code)
}
out = ui.OutputWriter.String()
if !strings.Contains(out, "Created At") {
t.Fatalf("expected to have 'Created At' but saw: %s", out)
if !strings.Contains(out, "Created") {
t.Fatalf("expected to have 'Created' but saw: %s", out)
}
ui.OutputWriter.Reset()

View file

@ -75,6 +75,116 @@ func formatTimeDifference(first, second time.Time, d time.Duration) string {
return second.Truncate(d).Sub(first.Truncate(d)).String()
}
// fmtInt formats v into the tail of buf.
// It returns the index where the output begins.
func fmtInt(buf []byte, v uint64) int {
w := len(buf)
for v > 0 {
w--
buf[w] = byte(v%10) + '0'
v /= 10
}
return w
}
// prettyTimeDiff prints a human readable time difference.
// It uses abbreviated forms for each period - s for seconds, m for minutes, h for hours,
// d for days, mo for months, and y for years. Time difference is rounded to the nearest second,
// and the top two least granular periods are returned. For example, if the time difference
// is 10 months, 12 days, 3 hours and 2 seconds, the string "10mo12d" is returned. Zero values return the empty string
func prettyTimeDiff(first, second time.Time) string {
// handle zero values
if first.Second() == 0 {
return ""
}
// round to the nearest second
first = first.Round(time.Second)
second = second.Round(time.Second)
// calculate time difference in seconds
d := second.Sub(first)
u := uint64(d.Seconds())
var buf [32]byte
w := len(buf)
secs := u % 60
// track indexes of various periods
var indexes []int
if secs > 0 {
w--
buf[w] = 's'
// u is now seconds
w = fmtInt(buf[:w], secs)
indexes = append(indexes, w)
}
u /= 60
// u is now minutes
if u > 0 {
mins := u % 60
if mins > 0 {
w--
buf[w] = 'm'
w = fmtInt(buf[:w], mins)
indexes = append(indexes, w)
}
u /= 60
// u is now hours
if u > 0 {
hrs := u % 24
if hrs > 0 {
w--
buf[w] = 'h'
w = fmtInt(buf[:w], hrs)
indexes = append(indexes, w)
}
u /= 24
}
// u is now days
if u > 0 {
days := u % 30
if days > 0 {
w--
buf[w] = 'd'
w = fmtInt(buf[:w], days)
indexes = append(indexes, w)
}
u /= 30
}
// u is now months
if u > 0 {
months := u % 12
if months > 0 {
w--
buf[w] = 'o'
w--
buf[w] = 'm'
w = fmtInt(buf[:w], months)
indexes = append(indexes, w)
}
u /= 12
}
// u is now years
if u > 0 {
w--
buf[w] = 'y'
w = fmtInt(buf[:w], u)
indexes = append(indexes, w)
}
}
start := w
end := len(buf)
// truncate to the first two periods
num_periods := len(indexes)
if num_periods > 2 {
end = indexes[num_periods-3]
}
return string(buf[start:end]) + " ago"
}
// getLocalNodeID returns the node ID of the local Nomad Client and an error if
// it couldn't be determined or the Agent is not running in Client mode.
func getLocalNodeID(client *api.Client) (string, error) {

View file

@ -294,3 +294,35 @@ func TestJobGetter_HTTPServer(t *testing.T) {
t.Fatalf("Unexpected file")
}
}
func TestPrettyTimeDiff(t *testing.T) {
test_cases := []struct {
d time.Duration
exp string
}{
{-740 * time.Second, "12m20s ago"},
{-12 * time.Minute, "12m ago"},
{-60 * time.Minute, "1h ago"},
{-80 * time.Minute, "1h20m ago"},
{-6 * time.Hour, "6h ago"},
{-22165 * time.Second, "6h9m ago"},
{-100 * time.Hour, "4d4h ago"},
{-438000 * time.Minute, "10mo4d ago"},
{-20460 * time.Hour, "2y4mo ago"},
}
for _, tc := range test_cases {
t2 := time.Now().Add(tc.d)
out := prettyTimeDiff(t2, time.Now())
if out != tc.exp {
t.Fatalf("expected :%v but got :%v", tc.exp, out)
}
}
var t1 time.Time
out := prettyTimeDiff(t1, time.Now())
if out != "" {
t.Fatalf("Expected empty output but got:%v", out)
}
}

View file

@ -406,9 +406,9 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen
allocs := make([]string, len(stubs)+1)
if verbose {
allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created At"
allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created|Modified"
for i, alloc := range stubs {
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s",
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s|%s",
limit(alloc.ID, uuidLength),
limit(alloc.EvalID, uuidLength),
limit(alloc.NodeID, uuidLength),
@ -416,19 +416,23 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen
alloc.JobVersion,
alloc.DesiredStatus,
alloc.ClientStatus,
formatUnixNanoTime(alloc.CreateTime))
formatUnixNanoTime(alloc.CreateTime),
formatUnixNanoTime(alloc.ModifyTime))
}
} else {
allocs[0] = "ID|Node ID|Task Group|Version|Desired|Status|Created At"
allocs[0] = "ID|Node ID|Task Group|Version|Desired|Status|Created|Modified"
for i, alloc := range stubs {
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%d|%s|%s|%s",
createTimePretty := prettyTimeDiff(time.Unix(0, alloc.CreateTime), time.Now())
modTimePretty := prettyTimeDiff(time.Unix(0, alloc.ModifyTime), time.Now())
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%d|%s|%s|%s|%s",
limit(alloc.ID, uuidLength),
limit(alloc.NodeID, uuidLength),
alloc.TaskGroup,
alloc.JobVersion,
alloc.DesiredStatus,
alloc.ClientStatus,
formatUnixNanoTime(alloc.CreateTime))
createTimePretty,
modTimePretty)
}
}

View file

@ -113,9 +113,12 @@ func TestJobStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "Allocations") {
t.Fatalf("should dump allocations")
}
if !strings.Contains(out, "Created At") {
if !strings.Contains(out, "Created") {
t.Fatal("should have created header")
}
if !strings.Contains(out, "Modified") {
t.Fatal("should have modified header")
}
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
@ -138,6 +141,14 @@ func TestJobStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "job1_sfx") || strings.Contains(out, "job2_sfx") {
t.Fatalf("expected only job1_sfx, got: %s", out)
}
if !strings.Contains(out, "Created") {
t.Fatal("should have created header")
}
if !strings.Contains(out, "Modified") {
t.Fatal("should have modified header")
}
ui.OutputWriter.Reset()
// Query in short view mode

View file

@ -4,4 +4,11 @@ ROOT_DIRECTORY="$( dirname "$(dirname "$CURRENT_DIRECTORY")")"
docker run --privileged -v \
$ROOT_DIRECTORY:/gopkg/src/github.com/hashicorp/nomad \
-it nomad-e2e /bin/bash \
-c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test -integration"
-c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test --run \
TestJobMigrations -integration"
docker run --privileged \
-v $ROOT_DIRECTORY:/gopkg/src/github.com/hashicorp/nomad \
-it nomad-e2e /bin/bash \
-c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test --run \
TestMigrations_WithACLs -integration"

View file

@ -3,6 +3,7 @@ package e2e
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
@ -108,15 +109,26 @@ func isSuccess(execCmd *exec.Cmd, retries int, keyword string) (string, error) {
// allNodesAreReady attempts to query the status of a cluster a specific number
// of times
func allNodesAreReady(retries int) (string, error) {
cmd := exec.Command("nomad", "node-status")
func allNodesAreReady(retries int, flags string) (string, error) {
var cmd *exec.Cmd
if flags != "" {
cmd = exec.Command("nomad", "node-status", flags)
} else {
cmd = exec.Command("nomad", "node-status")
}
return isSuccess(cmd, retries, "initializing")
}
// jobIsReady attempts sto query the status of a specific job a fixed number of
// times
func jobIsReady(retries int, jobName string) (string, error) {
cmd := exec.Command("nomad", "job", "status", jobName)
func jobIsReady(retries int, flags, jobName string) (string, error) {
var cmd *exec.Cmd
if flags != "" {
cmd = exec.Command("nomad", "job", "status", flags, jobName)
} else {
cmd = exec.Command("nomad", "job", "status", jobName)
}
return isSuccess(cmd, retries, "pending")
}
@ -146,6 +158,60 @@ func startCluster(clusterConfig []string) (func(), error) {
return f, nil
}
func bootstrapACL() (string, error) {
var bootstrapOut bytes.Buffer
bootstrapCmd := exec.Command("nomad", "acl", "bootstrap")
bootstrapCmd.Stdout = &bootstrapOut
if err := bootstrapCmd.Run(); err != nil {
return "", err
}
parts := strings.Split(bootstrapOut.String(), "\n")
if len(parts) < 2 {
return "", fmt.Errorf("unexpected bootstrap output")
}
secretIDLine := strings.Split(parts[1], " ")
if secretIDLine[0] != "Secret" {
return "", fmt.Errorf("unable to find secret id in bootstrap output")
}
return secretIDLine[len(secretIDLine)-1], nil
}
func startACLServer(serverConfig string) (func(), string, error) {
cmd := exec.Command("nomad", "agent", "-config", serverConfig)
if err := cmd.Start(); err != nil {
return func() {}, "", err
}
f := func() {
cmd.Process.Kill()
}
var secretID string
var err error
testutil.WaitForResultRetries(2000, func() (bool, error) {
secretIDOutput, err := bootstrapACL()
if err != nil {
return false, err
}
secretID = secretIDOutput
return true, nil
}, func(cmd_err error) {
err = cmd_err
})
if err != nil {
return func() {}, "", err
}
return f, secretID, nil
}
func TestJobMigrations(t *testing.T) {
flag.Parse()
if !*integration {
@ -160,7 +226,7 @@ func TestJobMigrations(t *testing.T) {
assert.Nil(err)
defer stopCluster()
_, err = allNodesAreReady(10)
_, err = allNodesAreReady(10, "")
assert.Nil(err)
fh, err := ioutil.TempFile("", "nomad-sleep-1")
@ -175,7 +241,7 @@ func TestJobMigrations(t *testing.T) {
err = jobCmd.Run()
assert.Nil(err)
firstJobOutput, err := jobIsReady(20, "sleep")
firstJobOutput, err := jobIsReady(20, "", "sleep")
assert.Nil(err)
assert.NotContains(firstJobOutput, "failed")
assert.NotContains(firstJobOutput, "pending")
@ -185,15 +251,69 @@ func TestJobMigrations(t *testing.T) {
defer os.Remove(fh2.Name())
_, err = fh2.WriteString(sleepJobTwo)
assert.Nil(err)
secondJobCmd := exec.Command("nomad", "run", fh2.Name())
err = secondJobCmd.Run()
assert.Nil(err)
jobOutput, err := jobIsReady(20, "sleep")
jobOutput, err := jobIsReady(20, "", "sleep")
assert.Nil(err)
assert.NotContains(jobOutput, "failed")
assert.Contains(jobOutput, "complete")
}
func TestMigrations_WithACLs(t *testing.T) {
flag.Parse()
if !*integration {
t.Skip("skipping test in non-integration mode.")
}
t.Parallel()
assert := assert.New(t)
stopServer, secretID, err := startACLServer("server_acl.hcl")
assert.Nil(err)
defer stopServer()
clusterConfig := []string{"client1.hcl", "client2.hcl"}
stopCluster, err := startCluster(clusterConfig)
assert.Nil(err)
defer stopCluster()
_, err = allNodesAreReady(10, "-token="+secretID)
assert.Nil(err)
fh, err := ioutil.TempFile("", "nomad-sleep-1")
assert.Nil(err)
defer os.Remove(fh.Name())
_, err = fh.WriteString(sleepJobOne)
assert.Nil(err)
jobCmd := exec.Command("nomad", "run", "-token="+secretID, fh.Name())
err = jobCmd.Run()
assert.Nil(err)
_, err = jobIsReady(20, "-token="+secretID, "sleep")
assert.Nil(err)
fh2, err := ioutil.TempFile("", "nomad-sleep-2")
assert.Nil(err)
defer os.Remove(fh2.Name())
_, err = fh2.WriteString(sleepJobTwo)
assert.Nil(err)
secondJobCmd := exec.Command("nomad", "run", "-token="+secretID, fh2.Name())
err = secondJobCmd.Run()
assert.Nil(err)
jobOutput, err := jobIsReady(20, "-token="+secretID, "sleep")
assert.Nil(err)
assert.NotContains(jobOutput, "failed")
assert.NotContains(jobOutput, "pending")
assert.Contains(jobOutput, "complete")

View file

@ -0,0 +1,13 @@
log_level = "DEBUG"
data_dir = "/tmp/server1_acl"
server {
enabled = true
bootstrap_expect = 1
}
acl {
enabled = true
}

View file

@ -820,6 +820,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return fmt.Errorf("must update at least one allocation")
}
// Update modified timestamp for client initiated allocation updates
now := time.Now().UTC().UnixNano()
for _, alloc := range args.Alloc {
alloc.ModifyTime = now
}
// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)

View file

@ -1323,8 +1323,10 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
node.ModifyIndex = resp.Index
// Inject fake evaluations async
now := time.Now().UTC().UnixNano()
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.ModifyTime = now
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
start := time.Now()
@ -1363,6 +1365,32 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Allocs)
}
iter, err := state.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
getAllocs := func(iter memdb.ResultIterator) []*structs.Allocation {
var allocs []*structs.Allocation
for {
raw := iter.Next()
if raw == nil {
break
}
allocs = append(allocs, raw.(*structs.Allocation))
}
return allocs
}
out := getAllocs(iter)
if len(out) != 1 {
t.Fatalf("Expected to get one allocation but got:%v", out)
}
if out[0].ModifyTime != now {
t.Fatalf("Invalid modify time %v", out[0].ModifyTime)
}
// Alloc updates fire watches
time.AfterFunc(100*time.Millisecond, func() {
allocUpdate := mock.Alloc()
@ -1675,6 +1703,10 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
if out.ClientStatus != structs.AllocClientStatusFailed {
t.Fatalf("Bad: %#v", out)
}
if out.ModifyTime <= 0 {
t.Fatalf("must have valid modify time but was %v", out.ModifyTime)
}
}
func TestClientEndpoint_BatchUpdate(t *testing.T) {

View file

@ -150,6 +150,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
}
// Dispatch the Raft transaction

View file

@ -147,6 +147,16 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("missing alloc")
}
if out.CreateTime <= 0 {
t.Fatalf("invalid create time %v", out.CreateTime)
}
if out.ModifyTime <= 0 {
t.Fatalf("invalid modify time %v", out.CreateTime)
}
if out.CreateTime != out.ModifyTime {
t.Fatalf("create time %v modify time %v must be equal", out.CreateTime, out.ModifyTime)
}
// Lookup the new deployment
dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID)
if err != nil {
@ -226,6 +236,10 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("missing job")
}
if out.ModifyTime <= 0 {
t.Fatalf("must have valid modify time but was %v", out.ModifyTime)
}
// Lookup the allocation
out, err = s1.fsm.State().AllocByID(ws, alloc2.ID)
if err != nil {

View file

@ -100,9 +100,11 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
// Enforce TLS if EnableRPC is set
if s.config.TLSConfig.EnableRPC && !isTLS && RPCType(buf[0]) != rpcTLS {
s.logger.Printf("[WARN] nomad.rpc: Non-TLS connection attempted with RequireTLS set")
conn.Close()
return
if !s.config.TLSConfig.RPCUpgradeMode {
s.logger.Printf("[WARN] nomad.rpc: Non-TLS connection attempted from %s with RequireTLS set", conn.RemoteAddr().String())
conn.Close()
return
}
}
// Switch on the byte

View file

@ -3,10 +3,17 @@ package nomad
import (
"net"
"net/rpc"
"os"
"path"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
)
// rpcClient is a test helper method to return a ClientCodec to use to make rpc
@ -84,3 +91,83 @@ func TestRPC_forwardRegion(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestRPC_PlaintextRPCSucceedsWhenInUpgradeMode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
dir := tmpDir(t)
defer os.RemoveAll(dir)
s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "node1")
c.TLSConfig = &config.TLSConfig{
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
RPCUpgradeMode: true,
}
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
// Create the register request
node := mock.Node()
req := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
assert.Nil(err)
// Check that heartbeatTimers has the heartbeat ID
_, ok := s1.heartbeatTimers[node.ID]
assert.True(ok)
}
func TestRPC_PlaintextRPCFailsWhenNotInUpgradeMode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
const (
cafile = "../helper/tlsutil/testdata/ca.pem"
foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
dir := tmpDir(t)
defer os.RemoveAll(dir)
s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "node1")
c.TLSConfig = &config.TLSConfig{
EnableRPC: true,
VerifyServerHostname: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
}
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
node := mock.Node()
req := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
assert.NotNil(err)
}

View file

@ -1718,6 +1718,9 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
// Update the modify index
copyAlloc.ModifyIndex = index
// Update the modify time
copyAlloc.ModifyTime = alloc.ModifyTime
if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
}

View file

@ -29,6 +29,11 @@ type TLSConfig struct {
// Must be provided to serve TLS connections.
KeyFile string `mapstructure:"key_file"`
// RPCUpgradeMode should be enabled when a cluster is being upgraded
// to TLS. Allows servers to accept both plaintext and TLS connections and
// should only be a temporary state.
RPCUpgradeMode bool `mapstructure:"rpc_upgrade_mode"`
// Verify connections to the HTTPS API
VerifyHTTPSClient bool `mapstructure:"verify_https_client"`
}

View file

@ -4605,6 +4605,9 @@ type Allocation struct {
// CreateTime is the time the allocation has finished scheduling and been
// verified by the plan applier.
CreateTime int64
// ModifyTime is the time the allocation was last updated.
ModifyTime int64
}
// Index returns the index of the allocation. If the allocation is from a task
@ -4746,6 +4749,7 @@ func (a *Allocation) Stub() *AllocListStub {
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
CreateTime: a.CreateTime,
ModifyTime: a.ModifyTime,
}
}
@ -4767,6 +4771,7 @@ type AllocListStub struct {
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
ModifyTime int64
}
// AllocMetric is used to track various metrics while attempting

View file

@ -141,7 +141,8 @@ $ curl \
},
"CreateIndex": 54,
"ModifyIndex": 57,
"CreateTime": 1495747371794276400
"CreateTime": 1495747371794276400,
"ModifyTime": 1495747371794276400
}
]
```
@ -461,7 +462,8 @@ $ curl \
"CreateIndex": 54,
"ModifyIndex": 57,
"AllocModifyIndex": 54,
"CreateTime": 1495747371794276400
"CreateTime": 1495747371794276400,
"ModifyTime": 1495747371794276400
}
```

View file

@ -253,7 +253,8 @@ $ curl \
"DeploymentStatus": null,
"CreateIndex": 19,
"ModifyIndex": 22,
"CreateTime": 1498775380678486300
"CreateTime": 1498775380678486300,
"ModifyTime": 1498775380678486300
}
]
```

View file

@ -761,7 +761,8 @@ $ curl \
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]
```

View file

@ -504,7 +504,8 @@ $ curl \
"CreateIndex": 15052,
"ModifyIndex": 15057,
"AllocModifyIndex": 15052,
"CreateTime": 1502140975600438500
"CreateTime": 1502140975600438500,
"ModifyTime": 1502140975600438500
},
...
]

View file

@ -54,6 +54,10 @@ the [Agent's Gossip and RPC Encryption](/docs/agent/encryption.html).
a Nomad client makes the client use TLS for making RPC requests to the Nomad
servers.
- `rpc_upgrade_mode` `(bool: false)` - This option should be used only when the
cluster is being upgraded to TLS, and removed after the migration is
complete. This allows the agent to accept both TLS and plaintext traffic.
- `verify_https_client` `(bool: false)` - Specifies agents should require
client certificates for all incoming HTTPS requests. The client certificates
must be signed by the same CA as Nomad.

View file

@ -11,7 +11,8 @@ description: >
The `alloc-status` command displays status information and metadata about an
existing allocation and its tasks. It can be useful while debugging to reveal
the underlying reasons for scheduling decisions or failures, as well as the
current state of its tasks.
current state of its tasks. As of Nomad 0.7.1, alloc status also shows allocation
modification time in addition to create time.
## Usage
@ -74,7 +75,8 @@ Client Status = running
Client Description = <none>
Desired Status = run
Desired Description = <none>
Created At = 07/25/17 16:12:48 UTC
Created = 5m ago
Modified = 5m ago
Deployment ID = 0c83a3b1
Deployment Health = healthy
@ -127,7 +129,8 @@ Client Status = running
Client Description = <none>
Desired Status = run
Desired Description = <none>
Created At = 07/25/17 16:12:48 UTC
Created = 07/25/17 16:12:48 UTC
Modified = 07/25/17 16:12:48 UTC
Deployment ID = 0c83a3b1-8a7b-136b-0e11-8383dc6c9276
Deployment Health = healthy
Evaluated Nodes = 1

View file

@ -22,7 +22,9 @@ the specific job is queried and displayed. Otherwise, a list of matching jobs an
information will be displayed.
If the ID is omitted, the command lists out all of the existing jobs and a few of
the most useful status fields for each.
the most useful status fields for each. As of Nomad 0.7.1, alloc status also shows allocation
modification time in addition to create time. When the `-verbose` flag is not set, allocation
creation and modify times are shown in a shortened relative time format like `5m ago`.
## General Options
@ -38,7 +40,7 @@ the most useful status fields for each.
* `-short`: Display short output. Used only when a single node is being queried.
Drops verbose node allocation data from the output.
* `-verbose`: Show full information.
* `-verbose`: Show full information. Allocation create and modify times are shown in `yyyy/mm/dd hh:mm:ss` format.
## Examples
@ -95,8 +97,8 @@ Task Group Desired Placed Healthy Unhealthy
cache 1 1 1 0
Allocations
ID Node ID Task Group Version Desired Status Created At
478ce836 5ed166e8 cache 0 run running 07/25/17 15:53:04 UTC
ID Node ID Task Group Version Desired Status Created Modified
478ce836 5ed166e8 cache 0 run running 5m ago 5m ago
```
Full status information of a perodic job:
@ -187,11 +189,11 @@ Task Group Desired Placed Healthy Unhealthy
cache 5 4 4 0
Allocations
ID Node ID Task Group Version Desired Status Created At
048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
ID Node ID Task Group Version Desired Status Created Modified
048c1e9e 3f38ecb4 cache 0 run running 5m ago 5m ago
250f9dec 3f38ecb4 cache 0 run running 5m ago 5m ago
2eb772a1 3f38ecb4 cache 0 run running 5m ago 5m ago
a17b7d3d 3f38ecb4 cache 0 run running 5m ago 5m ago
```
Full status information showing evaluations with a placement failure. The in
@ -240,9 +242,9 @@ Task Group Desired Placed Healthy Unhealthy
cache 5 4 4 0
Allocations
ID Node ID Task Group Version Desired Status Created At
048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC
ID Node ID Task Group Version Desired Status Created Modified
048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC
250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC
2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC
a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC
```

View file

@ -79,8 +79,8 @@ Task Group Desired Placed Healthy Unhealthy
cache 1 1 1 0
Allocations
ID Node ID Task Group Version Desired Status Created At
883269bf e42d6f19 cache 0 run running 10/31/17 22:58:40 UTC
ID Node ID Task Group Version Desired Status Created Modified
8ba85cef 171a583b cache 0 run running 5m ago 5m ago
```
Here we can see that the result of our evaluation was the creation of an
@ -101,8 +101,9 @@ Client Status = running
Client Description = <none>
Desired Status = run
Desired Description = <none>
Created At = 10/31/17 22:58:40 UTC
Deployment ID = b0a84e74
Created = 5m ago
Modified = 5m ago
Deployment ID = fa882a5b
Deployment Health = healthy
Task "redis" is "running"
@ -326,13 +327,10 @@ Task Group Desired Placed Healthy Unhealthy
cache 3 3 3 0
Allocations
ID Node ID Task Group Version Desired Status Created At
7dce5722 e42d6f19 cache 2 stop complete 11/01/17 17:31:16 UTC
8cfab5f4 e42d6f19 cache 2 stop complete 11/01/17 17:31:02 UTC
27bd4a41 e42d6f19 cache 2 stop complete 11/01/17 17:30:40 UTC
3249e320 e42d6f19 cache 1 stop complete 11/01/17 17:28:28 UTC
453b210f e42d6f19 cache 1 stop complete 11/01/17 17:28:28 UTC
883269bf e42d6f19 cache 1 stop complete 10/31/17 22:58:40 UTC
ID Node ID Task Group Version Desired Status Created Modified
8ace140d 2cfe061e cache 2 stop complete 5m ago 5m ago
8af5330a 2cfe061e cache 2 stop complete 6m ago 6m ago
df50c3ae 2cfe061e cache 2 stop complete 6m ago 6m ago
```
If we wanted to start the job again, we could simply `run` it again.