Batch client allocation updates to the server

This commit is contained in:
Alex Dadgar 2016-02-21 19:20:50 -08:00
parent 64ecbb7cc2
commit 281e2ca198
4 changed files with 94 additions and 67 deletions

View File

@ -30,7 +30,7 @@ const (
)
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error
type AllocStateUpdater func(alloc *structs.Allocation)
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
@ -262,9 +262,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
} else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending
} else if dead {
alloc.ClientStatus = structs.AllocClientStatusDead
}
return alloc
}
@ -273,42 +276,19 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.retrySyncState(r.destroyCh)
r.syncStatus()
case <-r.destroyCh:
return
}
}
}
// retrySyncState is used to retry the state sync until success
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
if err := r.syncStatus(); err == nil {
// The Alloc State might have been re-computed so we are
// snapshoting only the alloc runner
r.saveAllocRunnerState()
return
}
select {
case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)):
case <-stopCh:
return
}
}
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc.
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()
// Attempt to update the status
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
r.updater(alloc)
return r.saveAllocRunnerState()
}
// setStatus is used to update the allocation status
@ -475,7 +455,7 @@ OUTER:
r.taskLock.Unlock()
// Final state sync
r.retrySyncState(nil)
r.syncStatus()
// Block until we should destroy the state of the alloc
r.handleDestroy()

View File

@ -16,13 +16,11 @@ import (
type MockAllocStateUpdater struct {
Count int
Allocs []*structs.Allocation
Err error
}
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
m.Count += 1
m.Allocs = append(m.Allocs, alloc)
return m.Err
}
func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {

View File

@ -58,6 +58,10 @@ const (
// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 5 * time.Second
// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond
)
// DefaultConfig returns the default configuration
@ -100,6 +104,9 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@ -112,12 +119,13 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create the client
c := &Client{
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}
// Setup the Consul Service
@ -166,6 +174,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Begin periodic snapshotting of state.
go c.periodicSnapshot()
// Begin syncing allocations to the server
go c.allocSync()
// Start the client!
go c.run()
@ -816,19 +827,57 @@ func (c *Client) updateNodeStatus() error {
}
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
args := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
err := c.RPC("Node.UpdateAlloc", &args, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to update allocation: %v", err)
return err
}
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// Only send the fields that are updatable by the client.
stripped := new(structs.Allocation)
stripped.ID = alloc.ID
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
c.allocUpdates <- stripped
}
return nil
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
timeoutTimer := time.NewTimer(allocSyncIntv)
timeoutCh := timeoutTimer.C
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
case <-timeoutCh:
// Reset the timer
timeoutTimer.Reset(allocSyncIntv)
// Fast path if there are no updates
if len(updates) == 0 {
continue
}
sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
}
// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Printf("[ERR] client: failed to update allocations: %v", err)
} else {
updates = make(map[string]*structs.Allocation)
}
}
}
}
// allocUpdates holds the results of receiving updated allocations from the

View File

@ -324,27 +324,27 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus
state := s1.State()
state.UpsertAllocs(100, []*structs.Allocation{alloc})
newAlloc := new(structs.Allocation)
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning
err := c1.updateAllocStatus(newAlloc)
if err != nil {
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("no such alloc")
}
if out.ClientStatus == originalStatus {
return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
}
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", out)
}
})
}
func TestClient_WatchAllocs(t *testing.T) {