commit
503fa1eed1
55
acl/acl.go
55
acl/acl.go
|
@ -52,6 +52,12 @@ type ACL interface {
|
|||
// ServiceRead checks for permission to read a given service
|
||||
ServiceRead(string) bool
|
||||
|
||||
// EventRead determines if a specific event can be queried.
|
||||
EventRead(string) bool
|
||||
|
||||
// EventWrite determines if a specific event may be fired.
|
||||
EventWrite(string) bool
|
||||
|
||||
// ACLList checks for permission to list all the ACLs
|
||||
ACLList() bool
|
||||
|
||||
|
@ -87,6 +93,14 @@ func (s *StaticACL) ServiceWrite(string) bool {
|
|||
return s.defaultAllow
|
||||
}
|
||||
|
||||
func (s *StaticACL) EventRead(string) bool {
|
||||
return s.defaultAllow
|
||||
}
|
||||
|
||||
func (s *StaticACL) EventWrite(string) bool {
|
||||
return s.defaultAllow
|
||||
}
|
||||
|
||||
func (s *StaticACL) ACLList() bool {
|
||||
return s.allowManage
|
||||
}
|
||||
|
@ -136,6 +150,9 @@ type PolicyACL struct {
|
|||
|
||||
// serviceRules contains the service policies
|
||||
serviceRules *radix.Tree
|
||||
|
||||
// eventRules contains the user event policies
|
||||
eventRules *radix.Tree
|
||||
}
|
||||
|
||||
// New is used to construct a policy based ACL from a set of policies
|
||||
|
@ -145,6 +162,7 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) {
|
|||
parent: parent,
|
||||
keyRules: radix.New(),
|
||||
serviceRules: radix.New(),
|
||||
eventRules: radix.New(),
|
||||
}
|
||||
|
||||
// Load the key policy
|
||||
|
@ -156,6 +174,12 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) {
|
|||
for _, sp := range policy.Services {
|
||||
p.serviceRules.Insert(sp.Name, sp.Policy)
|
||||
}
|
||||
|
||||
// Load the event policy
|
||||
for _, ep := range policy.Events {
|
||||
p.eventRules.Insert(ep.Event, ep.Policy)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
@ -266,6 +290,37 @@ func (p *PolicyACL) ServiceWrite(name string) bool {
|
|||
return p.parent.ServiceWrite(name)
|
||||
}
|
||||
|
||||
// EventRead is used to determine if the policy allows for a
|
||||
// specific user event to be read.
|
||||
func (p *PolicyACL) EventRead(name string) bool {
|
||||
// Longest-prefix match on event names
|
||||
if _, rule, ok := p.eventRules.LongestPrefix(name); ok {
|
||||
switch rule {
|
||||
case EventPolicyRead:
|
||||
return true
|
||||
case EventPolicyWrite:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing matched, use parent
|
||||
return p.parent.EventRead(name)
|
||||
}
|
||||
|
||||
// EventWrite is used to determine if new events can be created
|
||||
// (fired) by the policy.
|
||||
func (p *PolicyACL) EventWrite(name string) bool {
|
||||
// Longest-prefix match event names
|
||||
if _, rule, ok := p.eventRules.LongestPrefix(name); ok {
|
||||
return rule == EventPolicyWrite
|
||||
}
|
||||
|
||||
// No match, use parent
|
||||
return p.parent.EventWrite(name)
|
||||
}
|
||||
|
||||
// ACLList checks if listing of ACLs is allowed
|
||||
func (p *PolicyACL) ACLList() bool {
|
||||
return p.parent.ACLList()
|
||||
|
|
|
@ -66,11 +66,23 @@ func TestStaticACL(t *testing.T) {
|
|||
if none.ServiceWrite("foobar") {
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.EventRead("foobar") {
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.EventRead("") {
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.EventWrite("foobar") {
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.EventWrite("") {
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.ACLList() {
|
||||
t.Fatalf("should not noneow")
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
if none.ACLModify() {
|
||||
t.Fatalf("should not noneow")
|
||||
t.Fatalf("should not allow")
|
||||
}
|
||||
|
||||
if !manage.KeyRead("foobar") {
|
||||
|
@ -132,6 +144,20 @@ func TestPolicyACL(t *testing.T) {
|
|||
Policy: ServicePolicyWrite,
|
||||
},
|
||||
},
|
||||
Events: []*EventPolicy{
|
||||
&EventPolicy{
|
||||
Event: "",
|
||||
Policy: EventPolicyRead,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "foo",
|
||||
Policy: EventPolicyWrite,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "bar",
|
||||
Policy: EventPolicyDeny,
|
||||
},
|
||||
},
|
||||
}
|
||||
acl, err := New(all, policy)
|
||||
if err != nil {
|
||||
|
@ -188,6 +214,27 @@ func TestPolicyACL(t *testing.T) {
|
|||
t.Fatalf("Write fail: %#v", c)
|
||||
}
|
||||
}
|
||||
|
||||
type eventcase struct {
|
||||
inp string
|
||||
read bool
|
||||
write bool
|
||||
}
|
||||
eventcases := []eventcase{
|
||||
{"foo", true, true},
|
||||
{"foobar", true, true},
|
||||
{"bar", false, false},
|
||||
{"barbaz", false, false},
|
||||
{"baz", true, false},
|
||||
}
|
||||
for _, c := range eventcases {
|
||||
if c.read != acl.EventRead(c.inp) {
|
||||
t.Fatalf("Event fail: %#v", c)
|
||||
}
|
||||
if c.write != acl.EventWrite(c.inp) {
|
||||
t.Fatalf("Event fail: %#v", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolicyACL_Parent(t *testing.T) {
|
||||
|
|
|
@ -13,6 +13,9 @@ const (
|
|||
ServicePolicyDeny = "deny"
|
||||
ServicePolicyRead = "read"
|
||||
ServicePolicyWrite = "write"
|
||||
EventPolicyRead = "read"
|
||||
EventPolicyWrite = "write"
|
||||
EventPolicyDeny = "deny"
|
||||
)
|
||||
|
||||
// Policy is used to represent the policy specified by
|
||||
|
@ -21,6 +24,7 @@ type Policy struct {
|
|||
ID string `hcl:"-"`
|
||||
Keys []*KeyPolicy `hcl:"key,expand"`
|
||||
Services []*ServicePolicy `hcl:"service,expand"`
|
||||
Events []*EventPolicy `hcl:"event,expand"`
|
||||
}
|
||||
|
||||
// KeyPolicy represents a policy for a key
|
||||
|
@ -43,6 +47,16 @@ func (k *ServicePolicy) GoString() string {
|
|||
return fmt.Sprintf("%#v", *k)
|
||||
}
|
||||
|
||||
// EventPolicy represents a user event policy.
|
||||
type EventPolicy struct {
|
||||
Event string `hcl:",key"`
|
||||
Policy string
|
||||
}
|
||||
|
||||
func (e *EventPolicy) GoString() string {
|
||||
return fmt.Sprintf("%#v", *e)
|
||||
}
|
||||
|
||||
// Parse is used to parse the specified ACL rules into an
|
||||
// intermediary set of policies, before being compiled into
|
||||
// the ACL
|
||||
|
@ -80,5 +94,16 @@ func Parse(rules string) (*Policy, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Validate the user event policies
|
||||
for _, ep := range p.Events {
|
||||
switch ep.Policy {
|
||||
case EventPolicyRead:
|
||||
case EventPolicyWrite:
|
||||
case EventPolicyDeny:
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid event policy: %#v", ep)
|
||||
}
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
|
|
@ -24,6 +24,15 @@ service "" {
|
|||
}
|
||||
service "foo" {
|
||||
policy = "read"
|
||||
}
|
||||
event "" {
|
||||
policy = "read"
|
||||
}
|
||||
event "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
event "bar" {
|
||||
policy = "deny"
|
||||
}
|
||||
`
|
||||
exp := &Policy{
|
||||
|
@ -55,6 +64,20 @@ service "foo" {
|
|||
Policy: ServicePolicyRead,
|
||||
},
|
||||
},
|
||||
Events: []*EventPolicy{
|
||||
&EventPolicy{
|
||||
Event: "",
|
||||
Policy: EventPolicyRead,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "foo",
|
||||
Policy: EventPolicyWrite,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "bar",
|
||||
Policy: EventPolicyDeny,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
out, err := Parse(inp)
|
||||
|
@ -90,6 +113,17 @@ func TestParse_JSON(t *testing.T) {
|
|||
"foo": {
|
||||
"policy": "read"
|
||||
}
|
||||
},
|
||||
"event": {
|
||||
"": {
|
||||
"policy": "read"
|
||||
},
|
||||
"foo": {
|
||||
"policy": "write"
|
||||
},
|
||||
"bar": {
|
||||
"policy": "deny"
|
||||
}
|
||||
}
|
||||
}`
|
||||
exp := &Policy{
|
||||
|
@ -121,6 +155,20 @@ func TestParse_JSON(t *testing.T) {
|
|||
Policy: ServicePolicyRead,
|
||||
},
|
||||
},
|
||||
Events: []*EventPolicy{
|
||||
&EventPolicy{
|
||||
Event: "",
|
||||
Policy: EventPolicyRead,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "foo",
|
||||
Policy: EventPolicyWrite,
|
||||
},
|
||||
&EventPolicy{
|
||||
Event: "bar",
|
||||
Policy: EventPolicyDeny,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
out, err := Parse(inp)
|
||||
|
|
|
@ -36,6 +36,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Get the ACL token
|
||||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// Get the filters
|
||||
if filt := req.URL.Query().Get("node"); filt != "" {
|
||||
event.NodeFilter = filt
|
||||
|
@ -57,7 +61,13 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
|
|||
}
|
||||
|
||||
// Try to fire the event
|
||||
if err := s.agent.UserEvent(dc, event); err != nil {
|
||||
if err := s.agent.UserEvent(dc, token, event); err != nil {
|
||||
if strings.Contains(err.Error(), permissionDenied) {
|
||||
resp.WriteHeader(403)
|
||||
resp.Write([]byte(permissionDenied))
|
||||
return nil, nil
|
||||
}
|
||||
resp.WriteHeader(500)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -5,9 +5,11 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
|
@ -51,10 +53,72 @@ func TestEventFire(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestEventFire_token(t *testing.T) {
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
// Create an ACL token
|
||||
args := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testEventPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token string
|
||||
if err := srv.agent.RPC("ACL.Apply", &args, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
type tcase struct {
|
||||
event string
|
||||
allowed bool
|
||||
}
|
||||
tcases := []tcase{
|
||||
{"foo", false},
|
||||
{"bar", false},
|
||||
{"baz", true},
|
||||
}
|
||||
for _, c := range tcases {
|
||||
// Try to fire the event over the HTTP interface
|
||||
url := fmt.Sprintf("/v1/event/fire/%s?token=%s", c.event, token)
|
||||
req, err := http.NewRequest("PUT", url, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err := srv.EventFire(resp, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Check the result
|
||||
body := resp.Body.String()
|
||||
if c.allowed {
|
||||
if strings.Contains(body, permissionDenied) {
|
||||
t.Fatalf("bad: %s", body)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad: %d", resp.Code)
|
||||
}
|
||||
} else {
|
||||
if !strings.Contains(body, permissionDenied) {
|
||||
t.Fatalf("bad: %s", body)
|
||||
}
|
||||
if resp.Code != 403 {
|
||||
t.Fatalf("bad: %d", resp.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, func(c *Config) {
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
}
|
||||
|
||||
func TestEventList(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -90,12 +154,12 @@ func TestEventList(t *testing.T) {
|
|||
func TestEventList_Filter(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
p = &UserEvent{Name: "foo"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -131,7 +195,7 @@ func TestEventList_Filter(t *testing.T) {
|
|||
func TestEventList_Blocking(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -159,7 +223,7 @@ func TestEventList_Blocking(t *testing.T) {
|
|||
go func() {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
p := &UserEvent{Name: "second"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
@ -202,7 +266,7 @@ func TestEventList_EventBufOrder(t *testing.T) {
|
|||
expected,
|
||||
&UserEvent{Name: "bar"},
|
||||
} {
|
||||
if err := srv.agent.UserEvent("", e); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", e); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func validateUserEventParams(params *UserEvent) error {
|
|||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer on the LAN
|
||||
func (a *Agent) UserEvent(dc string, params *UserEvent) error {
|
||||
func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
|
||||
// Validate the params
|
||||
if err := validateUserEventParams(params); err != nil {
|
||||
return err
|
||||
|
@ -85,27 +85,20 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error {
|
|||
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
||||
}
|
||||
|
||||
// Check if this is the local DC, fire locally
|
||||
if dc == "" || dc == a.config.Datacenter {
|
||||
if a.server != nil {
|
||||
return a.server.UserEvent(params.Name, payload)
|
||||
} else {
|
||||
return a.client.UserEvent(params.Name, payload)
|
||||
}
|
||||
} else {
|
||||
// Send an RPC to remote datacenter to service this
|
||||
args := structs.EventFireRequest{
|
||||
Datacenter: dc,
|
||||
Name: params.Name,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
// Any server can process in the remote DC, since the
|
||||
// gossip will take over anyways
|
||||
args.AllowStale = true
|
||||
var out structs.EventFireResponse
|
||||
return a.RPC("Internal.EventFire", &args, &out)
|
||||
// Service the event fire over RPC. This ensures that we authorize
|
||||
// the request against the token first.
|
||||
args := structs.EventFireRequest{
|
||||
Datacenter: dc,
|
||||
Name: params.Name,
|
||||
Payload: payload,
|
||||
QueryOptions: structs.QueryOptions{Token: token},
|
||||
}
|
||||
|
||||
// Any server can process in the remote DC, since the
|
||||
// gossip will take over anyways
|
||||
args.AllowStale = true
|
||||
var out structs.EventFireResponse
|
||||
return a.RPC("Internal.EventFire", &args, &out)
|
||||
}
|
||||
|
||||
// handleEvents is used to process incoming user events
|
||||
|
|
|
@ -153,6 +153,8 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
srv1 := &structs.NodeService{
|
||||
ID: "mysql",
|
||||
Service: "mysql",
|
||||
|
@ -162,13 +164,13 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
agent.state.AddService(srv1, "")
|
||||
|
||||
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
|
||||
err := agent.UserEvent("", p1)
|
||||
err := agent.UserEvent("dc1", "root", p1)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
p2 := &UserEvent{Name: "deploy"}
|
||||
err = agent.UserEvent("", p2)
|
||||
err = agent.UserEvent("dc1", "root", p2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -186,3 +188,66 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", last)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserEventToken(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
|
||||
// Set the default policies to deny
|
||||
conf.ACLDefaultPolicy = "deny"
|
||||
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Create an ACL token
|
||||
args := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testEventPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token string
|
||||
if err := agent.RPC("ACL.Apply", &args, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
type tcase struct {
|
||||
name string
|
||||
expect bool
|
||||
}
|
||||
cases := []tcase{
|
||||
{"foo", false},
|
||||
{"bar", false},
|
||||
{"baz", true},
|
||||
{"zip", false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
event := &UserEvent{Name: c.name}
|
||||
err := agent.UserEvent("dc1", token, event)
|
||||
allowed := false
|
||||
if err == nil || err.Error() != permissionDenied {
|
||||
allowed = true
|
||||
}
|
||||
if allowed != c.expect {
|
||||
t.Fatalf("bad: %#v result: %v", c, allowed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const testEventPolicy = `
|
||||
event "foo" {
|
||||
policy = "deny"
|
||||
}
|
||||
event "bar" {
|
||||
policy = "read"
|
||||
}
|
||||
event "baz" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
|
|
|
@ -33,12 +33,14 @@ Options:
|
|||
-service="" Regular expression to filter on service instances
|
||||
-tag="" Regular expression to filter on service tags. Must be used
|
||||
with -service.
|
||||
-token="" ACL token to use during requests. Defaults to that
|
||||
of the agent.
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
func (c *EventCommand) Run(args []string) int {
|
||||
var datacenter, name, node, service, tag string
|
||||
var datacenter, name, node, service, tag, token string
|
||||
cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError)
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
cmdFlags.StringVar(&datacenter, "datacenter", "", "")
|
||||
|
@ -46,6 +48,7 @@ func (c *EventCommand) Run(args []string) int {
|
|||
cmdFlags.StringVar(&node, "node", "", "")
|
||||
cmdFlags.StringVar(&service, "service", "", "")
|
||||
cmdFlags.StringVar(&tag, "tag", "", "")
|
||||
cmdFlags.StringVar(&token, "token", "", "")
|
||||
httpAddr := HTTPAddrFlag(cmdFlags)
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
return 1
|
||||
|
@ -120,6 +123,7 @@ func (c *EventCommand) Run(args []string) int {
|
|||
}
|
||||
opts := &consulapi.WriteOptions{
|
||||
Datacenter: datacenter,
|
||||
Token: token,
|
||||
}
|
||||
|
||||
// Fire the event
|
||||
|
|
|
@ -55,6 +55,7 @@ const (
|
|||
type rExecConf struct {
|
||||
datacenter string
|
||||
prefix string
|
||||
token string
|
||||
|
||||
foreignDC bool
|
||||
localDC string
|
||||
|
@ -136,6 +137,7 @@ func (c *ExecCommand) Run(args []string) int {
|
|||
cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "")
|
||||
cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "")
|
||||
cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "")
|
||||
cmdFlags.StringVar(&c.conf.token, "token", "", "")
|
||||
httpAddr := HTTPAddrFlag(cmdFlags)
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
return 1
|
||||
|
@ -173,7 +175,11 @@ func (c *ExecCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Create and test the HTTP client
|
||||
client, err := HTTPClientDC(*httpAddr, c.conf.datacenter)
|
||||
client, err := HTTPClientConfig(func(clientConf *consulapi.Config) {
|
||||
clientConf.Address = *httpAddr
|
||||
clientConf.Datacenter = c.conf.datacenter
|
||||
clientConf.Token = c.conf.token
|
||||
})
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
||||
return 1
|
||||
|
@ -625,6 +631,8 @@ Options:
|
|||
-wait-repl=200ms Period to wait for replication before firing event. This is an
|
||||
optimization to allow stale reads to be performed.
|
||||
-verbose Enables verbose output
|
||||
-token="" ACL token to use during requests. Defaults to that
|
||||
of the agent.
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
|
|
@ -47,16 +47,15 @@ func HTTPAddrFlag(f *flag.FlagSet) *string {
|
|||
|
||||
// HTTPClient returns a new Consul HTTP client with the given address.
|
||||
func HTTPClient(addr string) (*consulapi.Client, error) {
|
||||
return HTTPClientDC(addr, "")
|
||||
return HTTPClientConfig(func(c *consulapi.Config) {
|
||||
c.Address = addr
|
||||
})
|
||||
}
|
||||
|
||||
// HTTPClientDC returns a new Consul HTTP client with the given address and datacenter
|
||||
func HTTPClientDC(addr, dc string) (*consulapi.Client, error) {
|
||||
// HTTPClientConfig is used to return a new API client and modify its
|
||||
// configuration by passing in a config modifier function.
|
||||
func HTTPClientConfig(fn func(c *consulapi.Config)) (*consulapi.Client, error) {
|
||||
conf := consulapi.DefaultConfig()
|
||||
if envAddr := os.Getenv(HTTPAddrEnvName); addr == "" && envAddr != "" {
|
||||
addr = envAddr
|
||||
}
|
||||
conf.Address = addr
|
||||
conf.Datacenter = dc
|
||||
fn(conf)
|
||||
return consulapi.NewClient(conf)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/golang-lru"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -30,6 +31,9 @@ const (
|
|||
// anonymousToken is the token ID we re-write to if there
|
||||
// is no token ID provided
|
||||
anonymousToken = "anonymous"
|
||||
|
||||
// Maximum number of cached ACL entries
|
||||
aclCacheSize = 256
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -89,15 +93,57 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) {
|
|||
}
|
||||
|
||||
// Use our non-authoritative cache
|
||||
return s.lookupACL(id, authDC)
|
||||
return s.aclCache.lookupACL(id, authDC)
|
||||
}
|
||||
|
||||
// rpcFn is used to make an RPC call to the client or server.
|
||||
type rpcFn func(string, interface{}, interface{}) error
|
||||
|
||||
// aclCache is used to cache ACL's and policies.
|
||||
type aclCache struct {
|
||||
config *Config
|
||||
logger *log.Logger
|
||||
|
||||
// acls is a non-authoritative ACL cache
|
||||
acls *lru.Cache
|
||||
|
||||
// aclPolicyCache is a policy cache
|
||||
policies *lru.Cache
|
||||
|
||||
// The RPC function used to talk to the client/server
|
||||
rpc rpcFn
|
||||
}
|
||||
|
||||
// newAclCache returns a new cache layer for ACLs and policies
|
||||
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) {
|
||||
var err error
|
||||
cache := &aclCache{
|
||||
config: conf,
|
||||
logger: logger,
|
||||
rpc: rpc,
|
||||
}
|
||||
|
||||
// Initialize the non-authoritative ACL cache
|
||||
cache.acls, err = lru.New(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the ACL policy cache
|
||||
cache.policies, err = lru.New(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
|
||||
}
|
||||
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
// lookupACL is used when we are non-authoritative, and need
|
||||
// to resolve an ACL
|
||||
func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
// Check the cache for the ACL
|
||||
var cached *aclCacheEntry
|
||||
raw, ok := s.aclCache.Get(id)
|
||||
raw, ok := c.acls.Get(id)
|
||||
if ok {
|
||||
cached = raw.(*aclCacheEntry)
|
||||
}
|
||||
|
@ -119,22 +165,22 @@ func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) {
|
|||
args.ETag = cached.ETag
|
||||
}
|
||||
var out structs.ACLPolicy
|
||||
err := s.RPC("ACL.GetPolicy", &args, &out)
|
||||
err := c.rpc("ACL.GetPolicy", &args, &out)
|
||||
|
||||
// Handle the happy path
|
||||
if err == nil {
|
||||
return s.useACLPolicy(id, authDC, cached, &out)
|
||||
return c.useACLPolicy(id, authDC, cached, &out)
|
||||
}
|
||||
|
||||
// Check for not-found
|
||||
if strings.Contains(err.Error(), aclNotFound) {
|
||||
return nil, errors.New(aclNotFound)
|
||||
} else {
|
||||
s.logger.Printf("[ERR] consul.acl: Failed to get policy for '%s': %v", id, err)
|
||||
c.logger.Printf("[ERR] consul.acl: Failed to get policy for '%s': %v", id, err)
|
||||
}
|
||||
|
||||
// Unable to refresh, apply the down policy
|
||||
switch s.config.ACLDownPolicy {
|
||||
switch c.config.ACLDownPolicy {
|
||||
case "allow":
|
||||
return acl.AllowAll(), nil
|
||||
case "extend-cache":
|
||||
|
@ -148,7 +194,7 @@ func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) {
|
|||
}
|
||||
|
||||
// useACLPolicy handles an ACLPolicy response
|
||||
func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
|
||||
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
|
||||
// Check if we can used the cached policy
|
||||
if cached != nil && cached.ETag == p.ETag {
|
||||
if p.TTL > 0 {
|
||||
|
@ -159,7 +205,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
|
|||
|
||||
// Check for a cached compiled policy
|
||||
var compiled acl.ACL
|
||||
raw, ok := s.aclPolicyCache.Get(p.ETag)
|
||||
raw, ok := c.policies.Get(p.ETag)
|
||||
if ok {
|
||||
compiled = raw.(acl.ACL)
|
||||
} else {
|
||||
|
@ -167,7 +213,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
|
|||
parent := acl.RootACL(p.Parent)
|
||||
if parent == nil {
|
||||
var err error
|
||||
parent, err = s.lookupACL(p.Parent, authDC)
|
||||
parent, err = c.lookupACL(p.Parent, authDC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -180,7 +226,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
|
|||
}
|
||||
|
||||
// Cache the policy
|
||||
s.aclPolicyCache.Add(p.ETag, acl)
|
||||
c.policies.Add(p.ETag, acl)
|
||||
compiled = acl
|
||||
}
|
||||
|
||||
|
@ -192,7 +238,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
|
|||
if p.TTL > 0 {
|
||||
cached.Expires = time.Now().Add(p.TTL)
|
||||
}
|
||||
s.aclCache.Add(id, cached)
|
||||
c.acls.Add(id, cached)
|
||||
return compiled, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -201,11 +201,6 @@ func (c *Client) RemoveFailedNode(node string) error {
|
|||
return c.serf.RemoveFailedNode(node)
|
||||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer
|
||||
func (c *Client) UserEvent(name string, payload []byte) error {
|
||||
return c.serf.UserEvent(userEventName(name), payload, false)
|
||||
}
|
||||
|
||||
// KeyManagerLAN returns the LAN Serf keyring manager
|
||||
func (c *Client) KeyManagerLAN() *serf.KeyManager {
|
||||
return c.serf.KeyManager()
|
||||
|
|
|
@ -276,26 +276,18 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||
})
|
||||
|
||||
// Fire the user event
|
||||
err := c1.UserEvent("foo", []byte("bar"))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = s1.UserEvent("bar", []byte("baz"))
|
||||
if err != nil {
|
||||
if err := s1.UserEvent("foo", []byte("baz")); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for all the events
|
||||
var serverFoo, serverBar, clientFoo, clientBar bool
|
||||
for i := 0; i < 4; i++ {
|
||||
var clientReceived, serverReceived bool
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case e := <-clientOut:
|
||||
switch e.Name {
|
||||
case "foo":
|
||||
clientFoo = true
|
||||
case "bar":
|
||||
clientBar = true
|
||||
clientReceived = true
|
||||
default:
|
||||
t.Fatalf("Bad: %#v", e)
|
||||
}
|
||||
|
@ -303,9 +295,7 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||
case e := <-serverOut:
|
||||
switch e.Name {
|
||||
case "foo":
|
||||
serverFoo = true
|
||||
case "bar":
|
||||
serverBar = true
|
||||
serverReceived = true
|
||||
default:
|
||||
t.Fatalf("Bad: %#v", e)
|
||||
}
|
||||
|
@ -315,7 +305,7 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if !(serverFoo && serverBar && clientFoo && clientBar) {
|
||||
if !serverReceived || !clientReceived {
|
||||
t.Fatalf("missing events")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,11 +57,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
|
|||
return err
|
||||
}
|
||||
|
||||
// Check ACLs
|
||||
acl, err := m.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if acl != nil && !acl.EventWrite(args.Name) {
|
||||
m.srv.logger.Printf("[WARN] consul: user event %q blocked by ACLs", args.Name)
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// Set the query meta data
|
||||
m.srv.setQueryMeta(&reply.QueryMeta)
|
||||
|
||||
// Fire the event
|
||||
return m.srv.UserEvent(args.Name, args.Payload)
|
||||
return m.srv.serfLAN.UserEvent(args.Name, args.Payload, false)
|
||||
}
|
||||
|
||||
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.
|
||||
|
|
|
@ -325,3 +325,37 @@ func TestInternal_NodeDump_FilterACL(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInternal_EventFire_Token(t *testing.T) {
|
||||
dir, srv := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDownPolicy = "deny"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
|
||||
client := rpcClient(t, srv)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
// No token is rejected
|
||||
event := structs.EventFireRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
Payload: []byte("nope"),
|
||||
}
|
||||
err := client.Call("Internal.EventFire", &event, nil)
|
||||
if err == nil || err.Error() != permissionDenied {
|
||||
t.Fatalf("bad: %s", err)
|
||||
}
|
||||
|
||||
// Root token is allowed to fire
|
||||
event.Token = "root"
|
||||
err = client.Call("Internal.EventFire", &event, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/golang-lru"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -45,9 +44,6 @@ const (
|
|||
// open to a server
|
||||
serverMaxStreams = 64
|
||||
|
||||
// Maximum number of cached ACL entries
|
||||
aclCacheSize = 256
|
||||
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
// This is used to reduce disk I/O for the recently commited entries.
|
||||
raftLogCacheSize = 512
|
||||
|
@ -63,11 +59,8 @@ type Server struct {
|
|||
// aclAuthCache is the authoritative ACL cache
|
||||
aclAuthCache *acl.Cache
|
||||
|
||||
// aclCache is a non-authoritative ACL cache
|
||||
aclCache *lru.Cache
|
||||
|
||||
// aclPolicyCache is a policy cache
|
||||
aclPolicyCache *lru.Cache
|
||||
// aclCache is the non-authoritative ACL cache.
|
||||
aclCache *aclCache
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
@ -228,18 +221,10 @@ func NewServer(config *Config) (*Server, error) {
|
|||
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the non-authoritative ACL cache
|
||||
s.aclCache, err = lru.New(aclCacheSize)
|
||||
if err != nil {
|
||||
// Set up the non-authoritative ACL cache
|
||||
if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the ACL policy cache
|
||||
s.aclPolicyCache, err = lru.New(aclCacheSize)
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the RPC layer
|
||||
|
@ -631,11 +616,6 @@ func (s *Server) RemoveFailedNode(node string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer on the LAN
|
||||
func (s *Server) UserEvent(name string, payload []byte) error {
|
||||
return s.serfLAN.UserEvent(userEventName(name), payload, false)
|
||||
}
|
||||
|
||||
// IsLeader checks if this server is the cluster leader
|
||||
func (s *Server) IsLeader() bool {
|
||||
return s.raft.State() == raft.Leader
|
||||
|
|
|
@ -56,3 +56,5 @@ The list of available flags are:
|
|||
a matching tag. This must be used with `-service`. As an example, you may
|
||||
do "-service mysql -tag slave".
|
||||
|
||||
* `-token` - The ACL token to use when firing the event. This token must have
|
||||
write-level privileges for the event specified. Defaults to that of the agent.
|
||||
|
|
|
@ -62,3 +62,6 @@ The list of available flags are:
|
|||
|
||||
* `-verbose` - Enables verbose output.
|
||||
|
||||
* `-token` - The ACL token to use during requests. This token must have access
|
||||
to the prefix in the KV store as well as exec "write" access for the _rexec
|
||||
event. Defaults to that of the agent.
|
||||
|
|
|
@ -19,7 +19,7 @@ on tokens to which fine grained rules can be applied. It is very similar to
|
|||
When the ACL system was launched in Consul 0.4, it was only possible to specify
|
||||
policies for the KV store. In Consul 0.5, ACL policies were extended to service
|
||||
registrations. In Consul 0.6, ACL's were further extended to restrict the
|
||||
service discovery mechanisms.
|
||||
service discovery mechanisms and user events..
|
||||
|
||||
## ACL Design
|
||||
|
||||
|
@ -126,6 +126,27 @@ The most secure way of handling service registration and discovery is to run
|
|||
Consul 0.6+ and issue tokens with explicit access for the services or service
|
||||
prefixes which are expected to run on each agent.
|
||||
|
||||
### Blacklist mode and Events
|
||||
|
||||
Similar to the above, if your
|
||||
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to
|
||||
`deny`, the `anonymous` token will have no access to allow firing user events.
|
||||
This deviates from pre-0.6.0 builds, where user events were completely
|
||||
unrestricted.
|
||||
|
||||
Events have their own first-class expression in the ACL syntax. To restore
|
||||
access to user events from arbitrary agents, configure an ACL rule like the
|
||||
following for the `anonymous` token:
|
||||
|
||||
```
|
||||
event "" {
|
||||
policy = "write"
|
||||
}
|
||||
```
|
||||
|
||||
As always, the more secure way to handle user events is to explicitly grant
|
||||
access to each API token based on the events they should be able to fire.
|
||||
|
||||
### Bootstrapping ACLs
|
||||
|
||||
Bootstrapping the ACL system is done by providing an initial [`acl_master_token`
|
||||
|
@ -161,6 +182,12 @@ and ACLs can be found [below](#discovery_acls).
|
|||
|
||||
The policy for the "consul" service is always "write" as it is managed internally by Consul.
|
||||
|
||||
User event policies are defined by coupling an event name prefix with a policy.
|
||||
The rules are enforced using a longest-prefix match policy. The default rule,
|
||||
applied to any user event without a matching policy, is provided by an empty
|
||||
string. An event policy is one of "read", "write", or "deny". Currently, only
|
||||
the "write" level is enforced during event firing. Events can always be read.
|
||||
|
||||
We make use of
|
||||
the [HashiCorp Configuration Language (HCL)](https://github.com/hashicorp/hcl/)
|
||||
to specify policy. This language is human readable and interoperable
|
||||
|
@ -192,6 +219,16 @@ service "" {
|
|||
service "secure-" {
|
||||
policy = "read"
|
||||
}
|
||||
|
||||
# Allow firing any user event by default.
|
||||
event "" {
|
||||
policy = "write"
|
||||
}
|
||||
|
||||
# Deny firing events prefixed with "destroy-".
|
||||
event "destroy-" {
|
||||
policy = "deny"
|
||||
}
|
||||
```
|
||||
|
||||
This is equivalent to the following JSON input:
|
||||
|
@ -216,6 +253,14 @@ This is equivalent to the following JSON input:
|
|||
"secure-": {
|
||||
"policy": "read"
|
||||
}
|
||||
},
|
||||
"event": {
|
||||
"": {
|
||||
"policy": "write"
|
||||
},
|
||||
"destroy-": {
|
||||
"policy": "deny"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
|
Loading…
Reference in New Issue