Merge pull request #5615 from hashicorp/config-entry-rpc

Add RPC endpoints for config entry operations
This commit is contained in:
Kyle Havlovitz 2019-04-23 00:16:54 -07:00 committed by GitHub
commit d51fd740bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1054 additions and 61 deletions

View file

@ -0,0 +1,226 @@
package consul
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
// The ConfigEntry endpoint is used to query centralized config information
type ConfigEntry struct {
srv *Server
}
// Apply does an upsert of the given config entry.
func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *struct{}) error {
if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now())
// Normalize and validate the incoming config entry.
if err := args.Entry.Normalize(); err != nil {
return err
}
if err := args.Entry.Validate(); err != nil {
return err
}
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !args.Entry.CanWrite(rule) {
return acl.ErrPermissionDenied
}
args.Op = structs.ConfigEntryUpsert
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// Get returns a single config entry by Kind/Name.
func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error {
if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
// Create a dummy config entry to check the ACL permissions.
lookupEntry, err := structs.MakeConfigEntry(args.Kind, args.Name)
if err != nil {
return err
}
if rule != nil && !lookupEntry.CanRead(rule) {
return acl.ErrPermissionDenied
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entry, err := state.ConfigEntry(ws, args.Kind, args.Name)
if err != nil {
return err
}
reply.Index = index
if entry == nil {
return nil
}
reply.Kind = args.Kind
reply.Entries = []structs.ConfigEntry{entry}
return nil
})
}
// List returns all the config entries of the given kind. If Kind is blank,
// all existing config entries will be returned.
func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error {
if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ConfigEntriesByKind(ws, args.Kind)
if err != nil {
return err
}
// Filter the entries returned by ACL permissions.
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
for _, entry := range entries {
if rule != nil && !entry.CanRead(rule) {
continue
}
filteredEntries = append(filteredEntries, entry)
}
reply.Kind = args.Kind
reply.Index = index
reply.Entries = filteredEntries
return nil
})
}
// Delete deletes a config entry.
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now())
// Normalize the incoming entry.
if err := args.Entry.Normalize(); err != nil {
return err
}
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !args.Entry.CanWrite(rule) {
return acl.ErrPermissionDenied
}
args.Op = structs.ConfigEntryDelete
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// ResolveServiceConfig
func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, reply *structs.ServiceConfigResponse) error {
if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
// during the blocking query, this function will be rerun and these state store lookups
// will both be current.
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name)
if err != nil {
return err
}
serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal)
if err != nil {
return err
}
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry)
if !ok {
return fmt.Errorf("invalid proxy config type %T", serviceEntry)
}
// Resolve the service definition by overlaying the service config onto the global
// proxy config.
definition := structs.ServiceDefinition{
Name: args.Name,
}
if proxyConf != nil {
definition.Proxy = &structs.ConnectProxyConfig{
Config: proxyConf.Config,
}
}
if serviceConf != nil {
definition.Name = serviceConf.Name
}
reply.Index = index
reply.Definition = definition
return nil
})
}

View file

@ -0,0 +1,607 @@
package consul
import (
"os"
"testing"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
)
func TestConfigEntry_Apply(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
Name: "foo",
},
}
var out struct{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
state := s1.fsm.State()
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
}
func TestConfigEntry_Apply_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "write"
}
operator = "write"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// This should fail since we don't have write perms for the "db" service.
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
Name: "db",
},
WriteRequest: structs.WriteRequest{Token: id},
}
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// The "foo" service should work.
args.Entry = &structs.ServiceConfigEntry{
Name: "foo",
}
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)
require.NoError(err)
state := s1.fsm.State()
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
// Try to update the global proxy args with the anonymous token - this should fail.
proxyArgs := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
Config: map[string]interface{}{
"foo": 1,
},
},
}
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// Now with the privileged token.
proxyArgs.WriteRequest.Token = id
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out)
require.NoError(err)
}
func TestConfigEntry_Get(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// Create a dummy service in the state store to look up.
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, entry))
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "foo",
Datacenter: s1.config.Datacenter,
}
var out structs.IndexedConfigEntries
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out))
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
}
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "read"
}
operator = "read"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Create some dummy service/proxy configs to be looked up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
// This should fail since we don't have write perms for the "db" service.
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "db",
Datacenter: s1.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: id},
}
var out structs.IndexedConfigEntries
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// The "foo" service should work.
args.Name = "foo"
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out))
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
}
func TestConfigEntry_List(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// Create some dummy services in the state store to look up.
state := s1.fsm.State()
expected := structs.IndexedConfigEntries{
Entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
},
},
}
require.NoError(state.EnsureConfigEntry(1, expected.Entries[0]))
require.NoError(state.EnsureConfigEntry(2, expected.Entries[1]))
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Datacenter: "dc1",
}
var out structs.IndexedConfigEntries
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out))
expected.Kind = structs.ServiceDefaults
expected.QueryMeta = out.QueryMeta
require.Equal(expected, out)
}
func TestConfigEntry_List_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "read"
}
operator = "read"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Create some dummy service/proxy configs to be looked up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
}))
// This should filter out the "db" service since we don't have permissions for it.
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Datacenter: s1.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: id},
}
var out structs.IndexedConfigEntries
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out)
require.NoError(err)
serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry)
require.Len(out.Entries, 1)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
// Get the global proxy config.
args.Kind = structs.ProxyDefaults
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out)
require.NoError(err)
proxyConf, ok := out.Entries[0].(*structs.ProxyConfigEntry)
require.Len(out.Entries, 1)
require.True(ok)
require.Equal(structs.ProxyConfigGlobal, proxyConf.Name)
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
}
func TestConfigEntry_Delete(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// Create a dummy service in the state store to look up.
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, entry))
// Verify it's there.
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
require.True(ok)
require.Equal("foo", serviceConf.Name)
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
}
args.Entry = entry
var out struct{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
// Verify the entry was deleted.
_, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
require.Nil(existing)
}
func TestConfigEntry_Delete_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "write"
}
operator = "write"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Create some dummy service/proxy configs to be looked up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
// This should fail since we don't have write perms for the "db" service.
args := structs.ConfigEntryRequest{
Datacenter: s1.config.Datacenter,
Entry: &structs.ServiceConfigEntry{
Name: "db",
},
WriteRequest: structs.WriteRequest{Token: id},
}
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// The "foo" service should work.
args.Entry = &structs.ServiceConfigEntry{
Name: "foo",
}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
// Verify the entry was deleted.
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
require.Nil(existing)
// Try to delete the global proxy config without a token.
args = structs.ConfigEntryRequest{
Datacenter: s1.config.Datacenter,
Entry: &structs.ProxyConfigEntry{
Name: structs.ProxyConfigGlobal,
},
}
err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// Now delete with a valid token.
args.WriteRequest.Token = id
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
_, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err)
require.Nil(existing)
}
func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// Create a dummy proxy/service config in the state store to look up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"foo": "bar",
},
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
args := structs.ServiceConfigRequest{
Name: "foo",
Datacenter: s1.config.Datacenter,
}
var out structs.ServiceConfigResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
expected := structs.ServiceDefinition{
Name: "foo",
Proxy: &structs.ConnectProxyConfig{
Config: map[string]interface{}{
"foo": "bar",
},
},
}
out.Definition.Proxy.Config["foo"] = structs.Uint8ToString(out.Definition.Proxy.Config["foo"].([]uint8))
require.Equal(expected, out.Definition)
}
func TestConfigEntry_ResolveServiceConfig_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "write"
}
operator = "write"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Create some dummy service/proxy configs to be looked up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
}))
// This should fail since we don't have write perms for the "db" service.
args := structs.ServiceConfigRequest{
Name: "db",
Datacenter: s1.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: id},
}
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// The "foo" service should work.
args.Name = "foo"
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
}

View file

@ -1390,7 +1390,7 @@ func TestFSM_ConfigEntry(t *testing.T) {
// Verify it's in the state store. // Verify it's in the state store.
{ {
_, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") _, config, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1 entry.RaftIndex.ModifyIndex = 1

View file

@ -403,11 +403,11 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
assert.Equal(caConfig, caConf) assert.Equal(caConfig, caConf)
// Verify config entries are restored // Verify config entries are restored
_, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") _, serviceConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
require.NoError(err) require.NoError(err)
assert.Equal(serviceConfig, serviceConfEntry) assert.Equal(serviceConfig, serviceConfEntry)
_, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") _, proxyConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
assert.Equal(proxyConfig, proxyConfEntry) assert.Equal(proxyConfig, proxyConfEntry)

View file

@ -4,6 +4,7 @@ func init() {
registerEndpoint(func(s *Server) interface{} { return &ACL{s} }) registerEndpoint(func(s *Server) interface{} { return &ACL{s} })
registerEndpoint(func(s *Server) interface{} { return &Catalog{s} }) registerEndpoint(func(s *Server) interface{} { return &Catalog{s} })
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) }) registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} }) registerEndpoint(func(s *Server) interface{} { return &Health{s} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s} }) registerEndpoint(func(s *Server) interface{} { return &Intention{s} })

View file

@ -37,7 +37,7 @@ func configTableSchema() *memdb.TableSchema {
"kind": &memdb.IndexSchema{ "kind": &memdb.IndexSchema{
Name: "kind", Name: "kind",
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "Kind", Field: "Kind",
Lowercase: true, Lowercase: true,
@ -80,7 +80,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
} }
// ConfigEntry is called to get a given config entry. // ConfigEntry is called to get a given config entry.
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, structs.ConfigEntry, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -88,13 +88,14 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
// Get the existing config entry. // Get the existing config entry.
existing, err := tx.First(configTableName, "id", kind, name) watchCh, existing, err := tx.FirstWatch(configTableName, "id", kind, name)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
} }
if existing == nil { if existing == nil {
return idx, nil, nil return idx, nil, nil
} }
ws.Add(watchCh)
conf, ok := existing.(structs.ConfigEntry) conf, ok := existing.(structs.ConfigEntry)
if !ok { if !ok {
@ -105,13 +106,13 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
} }
// ConfigEntries is called to get all config entry objects. // ConfigEntries is called to get all config entry objects.
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) {
return s.ConfigEntriesByKind("") return s.ConfigEntriesByKind(nil, "")
} }
// ConfigEntriesByKind is called to get all config entry objects with the given kind. // ConfigEntriesByKind is called to get all config entry objects with the given kind.
// If kind is empty, all config entries will be returned. // If kind is empty, all config entries will be returned.
func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -129,6 +130,7 @@ func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry,
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
} }
ws.Add(iter.WatchCh())
var results []structs.ConfigEntry var results []structs.ConfigEntry
for v := iter.Next(); v != nil; v = iter.Next() { for v := iter.Next(); v != nil; v = iter.Next() {
@ -218,7 +220,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
// Try to retrieve the existing health check. // Try to retrieve the existing config entry.
existing, err := tx.First(configTableName, "id", kind, name) existing, err := tx.First(configTableName, "id", kind, name)
if err != nil { if err != nil {
return fmt.Errorf("failed config entry lookup: %s", err) return fmt.Errorf("failed config entry lookup: %s", err)

View file

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -22,7 +23,7 @@ func TestStore_ConfigEntry(t *testing.T) {
// Create // Create
require.NoError(s.EnsureConfigEntry(0, expected)) require.NoError(s.EnsureConfigEntry(0, expected))
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(0), idx) require.Equal(uint64(0), idx)
require.Equal(expected, config) require.Equal(expected, config)
@ -37,7 +38,7 @@ func TestStore_ConfigEntry(t *testing.T) {
} }
require.NoError(s.EnsureConfigEntry(1, updated)) require.NoError(s.EnsureConfigEntry(1, updated))
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(1), idx)
require.Equal(updated, config) require.Equal(updated, config)
@ -45,10 +46,30 @@ func TestStore_ConfigEntry(t *testing.T) {
// Delete // Delete
require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global"))
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(2), idx) require.Equal(uint64(2), idx)
require.Nil(config) require.Nil(config)
// Set up a watch.
serviceConf := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}
require.NoError(s.EnsureConfigEntry(3, serviceConf))
ws := memdb.NewWatchSet()
_, _, err = s.ConfigEntry(ws, structs.ServiceDefaults, "foo")
require.NoError(err)
// Make an unrelated modification and make sure the watch doesn't fire.
require.NoError(s.EnsureConfigEntry(4, updated))
require.False(watchFired(ws))
// Update the watched config and make sure it fires.
serviceConf.Protocol = "http"
require.NoError(s.EnsureConfigEntry(5, serviceConf))
require.True(watchFired(ws))
} }
func TestStore_ConfigEntryCAS(t *testing.T) { func TestStore_ConfigEntryCAS(t *testing.T) {
@ -66,7 +87,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
// Create // Create
require.NoError(s.EnsureConfigEntry(1, expected)) require.NoError(s.EnsureConfigEntry(1, expected))
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(1), idx)
require.Equal(expected, config) require.Equal(expected, config)
@ -84,7 +105,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
require.NoError(err) require.NoError(err)
// Entry should not be changed // Entry should not be changed
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(1), idx)
require.Equal(expected, config) require.Equal(expected, config)
@ -95,7 +116,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) {
require.NoError(err) require.NoError(err)
// Entry should be updated // Entry should be updated
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(err) require.NoError(err)
require.Equal(uint64(2), idx) require.Equal(uint64(2), idx)
require.Equal(updated, config) require.Equal(updated, config)
@ -114,25 +135,42 @@ func TestStore_ConfigEntries(t *testing.T) {
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,
Name: "test2", Name: "test2",
} }
entry3 := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "test3",
}
require.NoError(s.EnsureConfigEntry(0, entry1)) require.NoError(s.EnsureConfigEntry(0, entry1))
require.NoError(s.EnsureConfigEntry(1, entry2)) require.NoError(s.EnsureConfigEntry(1, entry2))
require.NoError(s.EnsureConfigEntry(2, entry3))
// Get all entries // Get all entries
idx, entries, err := s.ConfigEntries() idx, entries, err := s.ConfigEntries(nil)
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(2), idx)
require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) require.Equal([]structs.ConfigEntry{entry1, entry2, entry3}, entries)
// Get all proxy entries // Get all proxy entries
idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) idx, entries, err = s.ConfigEntriesByKind(nil, structs.ProxyDefaults)
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(2), idx)
require.Equal([]structs.ConfigEntry{entry1}, entries) require.Equal([]structs.ConfigEntry{entry1}, entries)
// Get all service entries // Get all service entries
idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) ws := memdb.NewWatchSet()
idx, entries, err = s.ConfigEntriesByKind(ws, structs.ServiceDefaults)
require.NoError(err) require.NoError(err)
require.Equal(uint64(1), idx) require.Equal(uint64(2), idx)
require.Equal([]structs.ConfigEntry{entry2}, entries) require.Equal([]structs.ConfigEntry{entry2, entry3}, entries)
// Watch should not have fired
require.False(watchFired(ws))
// Now make an update and make sure the watch fires.
require.NoError(s.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "test2",
Protocol: "tcp",
}))
require.True(watchFired(ws))
} }

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
) )
@ -25,17 +26,21 @@ type ConfigEntry interface {
Normalize() error Normalize() error
Validate() error Validate() error
// CanRead and CanWrite return whether or not the given Authorizer
// has permission to read or write to the config entry, respectively.
CanRead(acl.Authorizer) bool
CanWrite(acl.Authorizer) bool
GetRaftIndex() *RaftIndex GetRaftIndex() *RaftIndex
} }
// ServiceConfiguration is the top-level struct for the configuration of a service // ServiceConfiguration is the top-level struct for the configuration of a service
// across the entire cluster. // across the entire cluster.
type ServiceConfigEntry struct { type ServiceConfigEntry struct {
Kind string Kind string
Name string Name string
Protocol string Protocol string
Connect ConnectConfiguration Connect ConnectConfiguration
ServiceDefinitionDefaults ServiceDefinitionDefaults
RaftIndex RaftIndex
} }
@ -71,6 +76,14 @@ func (e *ServiceConfigEntry) Validate() error {
return nil return nil
} }
func (e *ServiceConfigEntry) CanRead(rule acl.Authorizer) bool {
return rule.ServiceRead(e.Name)
}
func (e *ServiceConfigEntry) CanWrite(rule acl.Authorizer) bool {
return rule.ServiceWrite(e.Name, nil)
}
func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex {
if e == nil { if e == nil {
return &RaftIndex{} return &RaftIndex{}
@ -83,26 +96,6 @@ type ConnectConfiguration struct {
SidecarProxy bool SidecarProxy bool
} }
type ServiceDefinitionDefaults struct {
EnableTagOverride bool
// Non script/docker checks only
Check *HealthCheck
Checks HealthChecks
// Kind is allowed to accommodate non-sidecar proxies but it will be an error
// if they also set Connect.DestinationServiceID since sidecars are
// configured via their associated service's config.
Kind ServiceKind
// Only DestinationServiceName and Config are supported.
Proxy ConnectProxyConfig
Connect ServiceConnect
Weights Weights
}
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. // ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
type ProxyConfigEntry struct { type ProxyConfigEntry struct {
Kind string Kind string
@ -130,6 +123,7 @@ func (e *ProxyConfigEntry) Normalize() error {
} }
e.Kind = ProxyDefaults e.Kind = ProxyDefaults
e.Name = ProxyConfigGlobal
return nil return nil
} }
@ -146,6 +140,14 @@ func (e *ProxyConfigEntry) Validate() error {
return nil return nil
} }
func (e *ProxyConfigEntry) CanRead(rule acl.Authorizer) bool {
return true
}
func (e *ProxyConfigEntry) CanWrite(rule acl.Authorizer) bool {
return rule.OperatorWrite()
}
func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex {
if e == nil { if e == nil {
return &RaftIndex{} return &RaftIndex{}
@ -161,18 +163,26 @@ const (
ConfigEntryDelete ConfigEntryOp = "delete" ConfigEntryDelete ConfigEntryOp = "delete"
) )
// ConfigEntryRequest is used when creating/updating/deleting a ConfigEntry.
type ConfigEntryRequest struct { type ConfigEntryRequest struct {
Op ConfigEntryOp Op ConfigEntryOp
Entry ConfigEntry Datacenter string
Entry ConfigEntry
WriteRequest
} }
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { func (c *ConfigEntryRequest) RequestDatacenter() string {
return c.Datacenter
}
func (c *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
// bs will grow if needed but allocate enough to avoid reallocation in common // bs will grow if needed but allocate enough to avoid reallocation in common
// case. // case.
bs := make([]byte, 128) bs := make([]byte, 128)
enc := codec.NewEncoderBytes(&bs, msgpackHandle) enc := codec.NewEncoderBytes(&bs, msgpackHandle)
// Encode kind first // Encode kind first
err = enc.Encode(r.Entry.GetKind()) err = enc.Encode(c.Entry.GetKind())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -181,7 +191,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
err = enc.Encode(struct { err = enc.Encode(struct {
*Alias *Alias
}{ }{
Alias: (*Alias)(r), Alias: (*Alias)(c),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -189,7 +199,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
return bs, nil return bs, nil
} }
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { func (c *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
// First decode the kind prefix // First decode the kind prefix
var kind string var kind string
dec := codec.NewDecoderBytes(data, msgpackHandle) dec := codec.NewDecoderBytes(data, msgpackHandle)
@ -198,11 +208,11 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
} }
// Then decode the real thing with appropriate kind of ConfigEntry // Then decode the real thing with appropriate kind of ConfigEntry
entry, err := makeConfigEntry(kind) entry, err := MakeConfigEntry(kind, "")
if err != nil { if err != nil {
return err return err
} }
r.Entry = entry c.Entry = entry
// Alias juggling to prevent infinite recursive calls back to this decode // Alias juggling to prevent infinite recursive calls back to this decode
// method. // method.
@ -210,7 +220,7 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
as := struct { as := struct {
*Alias *Alias
}{ }{
Alias: (*Alias)(r), Alias: (*Alias)(c),
} }
if err := dec.Decode(&as); err != nil { if err := dec.Decode(&as); err != nil {
return err return err
@ -218,13 +228,45 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
return nil return nil
} }
func makeConfigEntry(kind string) (ConfigEntry, error) { func MakeConfigEntry(kind, name string) (ConfigEntry, error) {
switch kind { switch kind {
case ServiceDefaults: case ServiceDefaults:
return &ServiceConfigEntry{}, nil return &ServiceConfigEntry{Name: name}, nil
case ProxyDefaults: case ProxyDefaults:
return &ProxyConfigEntry{}, nil return &ProxyConfigEntry{Name: name}, nil
default: default:
return nil, fmt.Errorf("invalid config entry kind: %s", kind) return nil, fmt.Errorf("invalid config entry kind: %s", kind)
} }
} }
// ConfigEntryQuery is used when requesting info about a config entry.
type ConfigEntryQuery struct {
Kind string
Name string
Datacenter string
QueryOptions
}
func (c *ConfigEntryQuery) RequestDatacenter() string {
return c.Datacenter
}
// ServiceConfigRequest is used when requesting the resolved configuration
// for a service.
type ServiceConfigRequest struct {
Name string
Datacenter string
QueryOptions
}
func (s *ServiceConfigRequest) RequestDatacenter() string {
return s.Datacenter
}
type ServiceConfigResponse struct {
Definition ServiceDefinition
QueryMeta
}

View file

@ -1149,6 +1149,83 @@ type IndexedNodeDump struct {
QueryMeta QueryMeta
} }
// IndexedConfigEntries has its own encoding logic which differs from
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
type IndexedConfigEntries struct {
Kind string
Entries []ConfigEntry
QueryMeta
}
func (c *IndexedConfigEntries) MarshalBinary() (data []byte, err error) {
// bs will grow if needed but allocate enough to avoid reallocation in common
// case.
bs := make([]byte, 128)
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
// Encode length.
err = enc.Encode(len(c.Entries))
if err != nil {
return nil, err
}
// Encode kind.
err = enc.Encode(c.Kind)
if err != nil {
return nil, err
}
// Then actual value using alias trick to avoid infinite recursion
type Alias IndexedConfigEntries
err = enc.Encode(struct {
*Alias
}{
Alias: (*Alias)(c),
})
if err != nil {
return nil, err
}
return bs, nil
}
func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error {
// First decode the number of entries.
var numEntries int
dec := codec.NewDecoderBytes(data, msgpackHandle)
if err := dec.Decode(&numEntries); err != nil {
return err
}
// Next decode the kind.
var kind string
if err := dec.Decode(&kind); err != nil {
return err
}
// Then decode the slice of ConfigEntries
c.Entries = make([]ConfigEntry, numEntries)
for i := 0; i < numEntries; i++ {
entry, err := MakeConfigEntry(kind, "")
if err != nil {
return err
}
c.Entries[i] = entry
}
// Alias juggling to prevent infinite recursive calls back to this decode
// method.
type Alias IndexedConfigEntries
as := struct {
*Alias
}{
Alias: (*Alias)(c),
}
if err := dec.Decode(&as); err != nil {
return err
}
return nil
}
// DirEntry is used to represent a directory entry. This is // DirEntry is used to represent a directory entry. This is
// used for values in our Key-Value store. // used for values in our Key-Value store.
type DirEntry struct { type DirEntry struct {