cd837b0b18
command/agent/* -> agent/* command/consul/* -> agent/consul/* command/agent/command{,_test}.go -> command/agent{,_test}.go command/base/command.go -> command/base.go command/base/* -> command/* commands.go -> command/commands.go The script which did the refactor is: ( cd $GOPATH/src/github.com/hashicorp/consul git mv command/agent/command.go command/agent.go git mv command/agent/command_test.go command/agent_test.go git mv command/agent/flag_slice_value{,_test}.go command/ git mv command/agent . git mv command/base/command.go command/base.go git mv command/base/config_util{,_test}.go command/ git mv commands.go command/ git mv consul agent rmdir command/base/ gsed -i -e 's|package agent|package command|' command/agent{,_test}.go gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go gsed -i -e 's|package main|package command|' command/commands.go gsed -i -e 's|base.Command|BaseCommand|' command/commands.go gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go gsed -i -e 's|base\.||' command/commands.go gsed -i -e 's|command\.||' command/commands.go gsed -i -e 's|command|c|' main.go gsed -i -e 's|range Commands|range command.Commands|' main.go gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go gsed -i -e 's|base.Command|BaseCommand|' command/*.go gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go gsed -i -e 's|base\.||' command/*_test.go gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go # fix imports f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f goimports -w $f f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f goimports -w $f goimports -w command/*.go main.go )
266 lines
6.5 KiB
Go
266 lines
6.5 KiB
Go
package agent
|
|
|
|
import (
|
|
"fmt"
|
|
"regexp"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/structs"
|
|
"github.com/hashicorp/go-uuid"
|
|
)
|
|
|
|
const (
|
|
// userEventMaxVersion is the maximum protocol version we understand
|
|
userEventMaxVersion = 1
|
|
|
|
// remoteExecName is the event name for a remote exec command
|
|
remoteExecName = "_rexec"
|
|
)
|
|
|
|
// UserEventParam is used to parameterize a user event
|
|
type UserEvent struct {
|
|
// ID of the user event. Automatically generated.
|
|
ID string
|
|
|
|
// Name of the event
|
|
Name string `codec:"n"`
|
|
|
|
// Optional payload
|
|
Payload []byte `codec:"p,omitempty"`
|
|
|
|
// NodeFilter is a regular expression to filter on nodes
|
|
NodeFilter string `codec:"nf,omitempty"`
|
|
|
|
// ServiceFilter is a regular expression to filter on services
|
|
ServiceFilter string `codec:"sf,omitempty"`
|
|
|
|
// TagFilter is a regular expression to filter on tags of a service,
|
|
// must be provided with ServiceFilter
|
|
TagFilter string `codec:"tf,omitempty"`
|
|
|
|
// Version of the user event. Automatically generated.
|
|
Version int `codec:"v"`
|
|
|
|
// LTime is the lamport time. Automatically generated.
|
|
LTime uint64 `codec:"-"`
|
|
}
|
|
|
|
// validateUserEventParams is used to sanity check the inputs
|
|
func validateUserEventParams(params *UserEvent) error {
|
|
// Validate the inputs
|
|
if params.Name == "" {
|
|
return fmt.Errorf("User event missing name")
|
|
}
|
|
if params.TagFilter != "" && params.ServiceFilter == "" {
|
|
return fmt.Errorf("Cannot provide tag filter without service filter")
|
|
}
|
|
if params.NodeFilter != "" {
|
|
if _, err := regexp.Compile(params.NodeFilter); err != nil {
|
|
return fmt.Errorf("Invalid node filter: %v", err)
|
|
}
|
|
}
|
|
if params.ServiceFilter != "" {
|
|
if _, err := regexp.Compile(params.ServiceFilter); err != nil {
|
|
return fmt.Errorf("Invalid service filter: %v", err)
|
|
}
|
|
}
|
|
if params.TagFilter != "" {
|
|
if _, err := regexp.Compile(params.TagFilter); err != nil {
|
|
return fmt.Errorf("Invalid tag filter: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UserEvent is used to fire an event via the Serf layer on the LAN
|
|
func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
|
|
// Validate the params
|
|
if err := validateUserEventParams(params); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Format message
|
|
var err error
|
|
if params.ID, err = uuid.GenerateUUID(); err != nil {
|
|
return fmt.Errorf("UUID generation failed: %v", err)
|
|
}
|
|
params.Version = userEventMaxVersion
|
|
payload, err := encodeMsgPack(¶ms)
|
|
if err != nil {
|
|
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
|
}
|
|
|
|
// Service the event fire over RPC. This ensures that we authorize
|
|
// the request against the token first.
|
|
args := structs.EventFireRequest{
|
|
Datacenter: dc,
|
|
Name: params.Name,
|
|
Payload: payload,
|
|
QueryOptions: structs.QueryOptions{Token: token},
|
|
}
|
|
|
|
// Any server can process in the remote DC, since the
|
|
// gossip will take over anyways
|
|
args.AllowStale = true
|
|
var out structs.EventFireResponse
|
|
return a.RPC("Internal.EventFire", &args, &out)
|
|
}
|
|
|
|
// handleEvents is used to process incoming user events
|
|
func (a *Agent) handleEvents() {
|
|
for {
|
|
select {
|
|
case e := <-a.eventCh:
|
|
// Decode the event
|
|
msg := new(UserEvent)
|
|
if err := decodeMsgPack(e.Payload, msg); err != nil {
|
|
a.logger.Printf("[ERR] agent: Failed to decode event: %v", err)
|
|
continue
|
|
}
|
|
msg.LTime = uint64(e.LTime)
|
|
|
|
// Skip if we don't pass filtering
|
|
if !a.shouldProcessUserEvent(msg) {
|
|
continue
|
|
}
|
|
|
|
// Ingest the event
|
|
a.ingestUserEvent(msg)
|
|
|
|
case <-a.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// shouldProcessUserEvent checks if an event makes it through our filters
|
|
func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
|
|
// Check the version
|
|
if msg.Version > userEventMaxVersion {
|
|
a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)",
|
|
msg.Version, msg.Name)
|
|
}
|
|
|
|
// Apply the filters
|
|
if msg.NodeFilter != "" {
|
|
re, err := regexp.Compile(msg.NodeFilter)
|
|
if err != nil {
|
|
a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v",
|
|
msg.NodeFilter, msg.Name, err)
|
|
return false
|
|
}
|
|
if !re.MatchString(a.config.NodeName) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
if msg.ServiceFilter != "" {
|
|
re, err := regexp.Compile(msg.ServiceFilter)
|
|
if err != nil {
|
|
a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v",
|
|
msg.ServiceFilter, msg.Name, err)
|
|
return false
|
|
}
|
|
|
|
var tagRe *regexp.Regexp
|
|
if msg.TagFilter != "" {
|
|
re, err := regexp.Compile(msg.TagFilter)
|
|
if err != nil {
|
|
a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v",
|
|
msg.TagFilter, msg.Name, err)
|
|
return false
|
|
}
|
|
tagRe = re
|
|
}
|
|
|
|
// Scan for a match
|
|
services := a.state.Services()
|
|
found := false
|
|
OUTER:
|
|
for name, info := range services {
|
|
// Check the service name
|
|
if !re.MatchString(name) {
|
|
continue
|
|
}
|
|
if tagRe == nil {
|
|
found = true
|
|
break
|
|
}
|
|
|
|
// Look for a matching tag
|
|
for _, tag := range info.Tags {
|
|
if !tagRe.MatchString(tag) {
|
|
continue
|
|
}
|
|
found = true
|
|
break OUTER
|
|
}
|
|
}
|
|
|
|
// No matching services
|
|
if !found {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ingestUserEvent is used to process an event that passes filtering
|
|
func (a *Agent) ingestUserEvent(msg *UserEvent) {
|
|
// Special handling for internal events
|
|
switch msg.Name {
|
|
case remoteExecName:
|
|
if *a.config.DisableRemoteExec {
|
|
a.logger.Printf("[INFO] agent: ignoring remote exec event (%s), disabled.", msg.ID)
|
|
} else {
|
|
go a.handleRemoteExec(msg)
|
|
}
|
|
return
|
|
default:
|
|
a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID)
|
|
}
|
|
|
|
a.eventLock.Lock()
|
|
defer func() {
|
|
a.eventLock.Unlock()
|
|
a.eventNotify.Notify()
|
|
}()
|
|
|
|
idx := a.eventIndex
|
|
a.eventBuf[idx] = msg
|
|
a.eventIndex = (idx + 1) % len(a.eventBuf)
|
|
}
|
|
|
|
// UserEvents is used to return a slice of the most recent
|
|
// user events.
|
|
func (a *Agent) UserEvents() []*UserEvent {
|
|
n := len(a.eventBuf)
|
|
out := make([]*UserEvent, n)
|
|
a.eventLock.RLock()
|
|
defer a.eventLock.RUnlock()
|
|
|
|
// Check if the buffer is full
|
|
if a.eventBuf[a.eventIndex] != nil {
|
|
if a.eventIndex == 0 {
|
|
copy(out, a.eventBuf)
|
|
} else {
|
|
copy(out, a.eventBuf[a.eventIndex:])
|
|
copy(out[n-a.eventIndex:], a.eventBuf[:a.eventIndex])
|
|
}
|
|
} else {
|
|
// We haven't filled the buffer yet
|
|
copy(out, a.eventBuf[:a.eventIndex])
|
|
out = out[:a.eventIndex]
|
|
}
|
|
return out
|
|
}
|
|
|
|
// LastUserEvent is used to return the lastest user event.
|
|
// This will return nil if there is no recent event.
|
|
func (a *Agent) LastUserEvent() *UserEvent {
|
|
a.eventLock.RLock()
|
|
defer a.eventLock.RUnlock()
|
|
n := len(a.eventBuf)
|
|
idx := (((a.eventIndex - 1) % n) + n) % n
|
|
return a.eventBuf[idx]
|
|
}
|