Making the allocs hold service ids

This commit is contained in:
Diptanu Choudhury 2015-12-14 14:53:49 -08:00
parent e7593cccc2
commit 76486d71e2
7 changed files with 74 additions and 50 deletions

View File

@ -47,6 +47,7 @@ type Allocation struct {
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string

View File

@ -112,7 +112,7 @@ func (r *AllocRunner) RestoreState() error {
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[name] = tr
@ -324,7 +324,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.tasks[task.Name] = tr
go tr.Run()

View File

@ -62,8 +62,8 @@ func (a *consulApiClient) Checks() (map[string]*consul.AgentCheck, error) {
// trackedTask is a Task that we are tracking for changes in service and check
// definitions and keep them sycned with Consul Agent
type trackedTask struct {
allocID string
task *structs.Task
task *structs.Task
alloc *structs.Allocation
}
// ConsulService is the service which tracks tasks and syncs the services and
@ -143,15 +143,16 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
// Register starts tracking a task for changes to it's services and tasks and
// adds/removes services and checks associated with it.
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
tt := &trackedTask{task: task, alloc: alloc}
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
if err := c.registerService(service, task, alloc); err != nil {
fmt.Printf("DIPTANU ERR %v\n", err)
mErr.Errors = append(mErr.Errors, err)
}
}
@ -161,17 +162,18 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {
// Deregister stops tracking a task for changes to it's services and checks and
// removes all the services and checks associated with the Task
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
serviceId := alloc.Services[service.Name]
if serviceId == "" {
continue
}
c.logger.Printf("[INFO] consul: deregistering service %v with consul", service.Name)
if err := c.deregisterService(service.Id); err != nil {
if err := c.deregisterService(serviceId); err != nil {
c.printLogMessage("[DEBUG] consul: error in deregistering service %v from consul", service.Name)
mErr.Errors = append(mErr.Errors, err)
}
@ -223,28 +225,30 @@ func (c *ConsulService) performSync() {
// Add services and checks which Consul doesn't know about
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
serviceId := trackedTask.alloc.Services[service.Name]
// Add new services which Consul agent isn't aware of
knownServices[service.Id] = struct{}{}
if _, ok := consulServices[service.Id]; !ok {
knownServices[serviceId] = struct{}{}
if _, ok := consulServices[serviceId]; !ok {
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
// If a service has changed, re-register it with Consul agent
if service.Hash() != c.serviceStates[service.Id] {
if service.Hash() != c.serviceStates[serviceId] {
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
c.registerService(service, trackedTask.task, trackedTask.allocID)
c.registerService(service, trackedTask.task, trackedTask.alloc)
continue
}
// Add new checks that Consul isn't aware of
for _, check := range service.Checks {
knownChecks[check.Id] = struct{}{}
if _, ok := consulChecks[check.Id]; !ok {
checkId := check.Hash(serviceId)
knownChecks[checkId] = struct{}{}
if _, ok := consulChecks[checkId]; !ok {
host, port := trackedTask.task.FindHostAndPortFor(service.PortLabel)
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceId, check, host, port)
c.registerCheck(cr)
}
}
@ -276,16 +280,17 @@ func (c *ConsulService) performSync() {
}
// registerService registers a Service with Consul
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, allocID string) error {
func (c *ConsulService) registerService(service *structs.Service, task *structs.Task, alloc *structs.Allocation) error {
var mErr multierror.Error
host, port := task.FindHostAndPortFor(service.PortLabel)
if host == "" || port == 0 {
return fmt.Errorf("consul: the port:%q marked for registration of service: %q couldn't be found", service.PortLabel, service.Name)
}
c.serviceStates[service.Id] = service.Hash()
serviceId := alloc.Services[service.Name]
c.serviceStates[serviceId] = service.Hash()
asr := &consul.AgentServiceRegistration{
ID: service.Id,
ID: serviceId,
Name: service.Name,
Tags: service.Tags,
Port: port,
@ -297,7 +302,7 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
mErr.Errors = append(mErr.Errors, err)
}
for _, check := range service.Checks {
cr := c.makeCheck(service, check, host, port)
cr := c.makeCheck(serviceId, check, host, port)
if err := c.registerCheck(cr); err != nil {
c.printLogMessage("[DEBUG] consul: error while registerting check %v with consul: %v", check.Name, err)
mErr.Errors = append(mErr.Errors, err)
@ -329,11 +334,12 @@ func (c *ConsulService) deregisterService(serviceId string) error {
}
// makeCheck creates a Consul Check Registration struct
func (c *ConsulService) makeCheck(service *structs.Service, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
func (c *ConsulService) makeCheck(serviceId string, check *structs.ServiceCheck, ip string, port int) *consul.AgentCheckRegistration {
checkId := check.Hash(serviceId)
cr := &consul.AgentCheckRegistration{
ID: check.Id,
ID: checkId,
Name: check.Name,
ServiceID: service.Id,
ServiceID: serviceId,
}
cr.Interval = check.Interval.String()
cr.Timeout = check.Timeout.String()

View File

@ -23,7 +23,7 @@ type TaskRunner struct {
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
allocID string
alloc *structs.Allocation
restartTracker restartTracker
consulService *ConsulService
@ -52,7 +52,7 @@ type TaskStateUpdater func(taskName string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
allocID string, task *structs.Task, state *structs.TaskState,
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {
tc := &TaskRunner{
@ -62,7 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
restartTracker: restartTracker,
consulService: consulService,
ctx: ctx,
allocID: allocID,
alloc: alloc,
task: task,
state: state,
updateCh: make(chan *structs.Task, 8),
@ -85,7 +85,7 @@ func (r *TaskRunner) stateFilePath() string {
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.allocID,
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
dirName, "state.json")
return path
}
@ -113,7 +113,7 @@ func (r *TaskRunner) RestoreState() error {
// In the case it fails, we relaunch the task in the Run() method.
if err != nil {
r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
return nil
}
r.handle = handle
@ -176,7 +176,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.allocID, err)
r.task.Driver, r.alloc.ID, err)
r.logger.Printf("[ERR] client: %s", err)
}
return driver, err
@ -196,7 +196,7 @@ func (r *TaskRunner) startTask() error {
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.task.Name, r.alloc.ID, err)
e := structs.NewTaskEvent(structs.TaskDriverFailure).
SetDriverError(fmt.Errorf("failed to start: %v", err))
r.setState(structs.TaskStateDead, e)
@ -211,7 +211,7 @@ func (r *TaskRunner) startTask() error {
func (r *TaskRunner) Run() {
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
r.task.Name, r.alloc.ID)
r.run()
return
@ -234,10 +234,10 @@ func (r *TaskRunner) run() {
destroyed := false
// Register the services defined by the task with Consil
r.consulService.Register(r.task, r.allocID)
r.consulService.Register(r.task, r.alloc)
// De-Register the services belonging to the task from consul
defer r.consulService.Deregister(r.task, r.allocID)
defer r.consulService.Deregister(r.task, r.alloc)
OUTER:
// Wait for updates
@ -249,7 +249,7 @@ func (r *TaskRunner) run() {
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
case <-r.destroyCh:
// Avoid destroying twice
@ -259,7 +259,7 @@ func (r *TaskRunner) run() {
// Send the kill signal, and use the WaitCh to block until complete
if err := r.handle.Kill(); err != nil {
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err)
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
destroyErr = err
}
destroyed = true
@ -274,16 +274,16 @@ func (r *TaskRunner) run() {
// Log whether the task was successful or not.
if !waitRes.Successful() {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes)
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes)
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID)
}
// Check if we should restart. If not mark task as dead and exit.
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
waitEvent := r.waitErrorToEvent(waitRes)
if !shouldRestart {
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID)
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
r.setState(structs.TaskStateDead, waitEvent)
return
}
@ -329,7 +329,7 @@ func (r *TaskRunner) Update(update *structs.Task) {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.allocID)
update.Name, r.alloc.ID)
}
}

View File

@ -1106,7 +1106,6 @@ const (
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string // Id of the check, must be unique and it is autogenrated
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Script string // Script to invoke for script check
@ -1151,7 +1150,6 @@ const (
// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Name string // Name of the service, defaults to id
Tags []string // List of tags for the service
PortLabel string `mapstructure:"port"` // port for the service
@ -1161,10 +1159,6 @@ type Service struct {
// InitFields interpolates values of Job, Task Group and Task in the Service
// Name. This also generates check names, service id and check ids.
func (s *Service) InitFields(job string, taskGroup string, task string) {
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
s.Id = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
s.Name = args.ReplaceEnv(s.Name, map[string]string{
"JOB": job,
"TASKGROUP": taskGroup,
@ -1174,7 +1168,6 @@ func (s *Service) InitFields(job string, taskGroup string, task string) {
)
for _, check := range s.Checks {
check.Id = check.Hash(s.Id)
if check.Name == "" {
check.Name = fmt.Sprintf("service: %q check", s.Name)
}
@ -1451,6 +1444,9 @@ type Allocation struct {
// task. These should sum to the total Resources.
TaskResources map[string]*Resources
// Services is a map of service names and service ids
Services map[string]string
// Metrics associated with this allocation
Metrics *AllocMetric
@ -1504,6 +1500,19 @@ func (a *Allocation) Stub() *AllocListStub {
}
}
func (a *Allocation) PopulateServiceIds() {
a.Services = make(map[string]string)
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, task := range tg.Tasks {
for _, service := range task.Services {
// We add a prefix to the Service ID so that we can know that this service
// is managed by Consul since Consul can also have service which are not
// managed by Nomad
a.Services[service.Name] = fmt.Sprintf("%s-%s", NomadConsulPrefix, GenerateUUID())
}
}
}
// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string

View File

@ -279,6 +279,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
Metrics: s.ctx.Metrics(),
}
// Generate the service ids for the tasks which this allocation is going
// to run
alloc.PopulateServiceIds()
// Set fields based on if we found an allocation option
if option != nil {
alloc.NodeID = option.Node.ID

View File

@ -246,6 +246,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
Metrics: s.ctx.Metrics(),
}
// Generate the service ids for the tasks that this allocation is going
// to run
alloc.PopulateServiceIds()
// Set fields based on if we found an allocation option
if option != nil {
alloc.NodeID = option.Node.ID