From 81254deb59271ea7cd76866600a6113ec14d03f6 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Sat, 6 Apr 2019 23:38:08 -0700 Subject: [PATCH] Add RPC endpoints for config entry operations --- agent/consul/config_endpoint.go | 266 ++++++++++ agent/consul/config_endpoint_test.go | 619 ++++++++++++++++++++++++ agent/consul/fsm/commands_oss_test.go | 2 +- agent/consul/fsm/snapshot_oss_test.go | 4 +- agent/consul/server_oss.go | 1 + agent/consul/state/config_entry.go | 16 +- agent/consul/state/config_entry_test.go | 66 ++- agent/structs/config_entry.go | 86 ++-- agent/structs/structs.go | 76 +++ 9 files changed, 1079 insertions(+), 57 deletions(-) create mode 100644 agent/consul/config_endpoint.go create mode 100644 agent/consul/config_endpoint_test.go diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go new file mode 100644 index 000000000..ee5d1b9b2 --- /dev/null +++ b/agent/consul/config_endpoint.go @@ -0,0 +1,266 @@ +package consul + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +// The ConfigEntry endpoint is used to query centralized config information +type ConfigEntry struct { + srv *Server +} + +// Apply does an upsert of the given config entry. +func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *struct{}) error { + if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now()) + + // Normalize and validate the incoming config entry. + if err := args.Entry.Normalize(); err != nil { + return err + } + if err := args.Entry.Validate(); err != nil { + return err + } + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if err := verifyConfigWriteACL(rule, args.Entry.GetKind(), args.Entry.GetName()); err != nil { + return err + } + + args.Op = structs.ConfigEntryUpsert + resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + return nil +} + +// Get returns a single config entry by Kind/Name. +func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error { + if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now()) + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if err := verifyConfigReadACL(rule, args.Kind, args.Name); err != nil { + return err + } + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, entry, err := state.ConfigEntry(ws, args.Kind, args.Name) + if err != nil { + return err + } + + reply.Index = index + reply.Entries = []structs.ConfigEntry{entry} + return nil + }) +} + +// List returns all the config entries of the given kind. If Kind is blank, +// all existing config entries will be returned. +func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error { + if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now()) + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, entries, err := state.ConfigEntriesByKind(ws, args.Kind) + if err != nil { + return err + } + + // Filter the entries returned by ACL permissions. + // TODO(kyhavlov): should we handle the proxy config differently here since + // it's a singleton? + filteredEntries := make([]structs.ConfigEntry, 0, len(entries)) + for _, entry := range entries { + if err := verifyConfigReadACL(rule, entry.GetKind(), entry.GetName()); err != nil { + if acl.IsErrPermissionDenied(err) { + continue + } else { + return err + } + } + filteredEntries = append(filteredEntries, entry) + } + + reply.Index = index + reply.Entries = filteredEntries + return nil + }) +} + +// Delete deletes a config entry. +func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error { + if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now()) + + // Normalize the incoming entry. + if err := args.Entry.Normalize(); err != nil { + return err + } + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if err := verifyConfigWriteACL(rule, args.Entry.GetKind(), args.Entry.GetName()); err != nil { + return err + } + + args.Op = structs.ConfigEntryDelete + resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + return nil +} + +// ResolveServiceConfig +func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, reply *structs.ServiceConfigResponse) error { + if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now()) + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if rule != nil && !rule.ServiceRead(args.Name) { + return acl.ErrPermissionDenied + } + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + // Pass the WatchSet to both the service and proxy config lookups. If either is updated + // during the blocking query, this function will be rerun and these state store lookups + // will both be current. + index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name) + if err != nil { + return err + } + serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return fmt.Errorf("invalid service config type %T", serviceEntry) + } + + _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal) + if err != nil { + return err + } + proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) + if !ok { + return fmt.Errorf("invalid proxy config type %T", serviceEntry) + } + + // Resolve the service definition by overlaying the service config onto the global + // proxy config. + definition := structs.ServiceDefinition{ + Name: args.Name, + } + if proxyConf != nil { + definition.Proxy = &structs.ConnectProxyConfig{ + Config: proxyConf.Config, + } + } + if serviceConf != nil { + definition.Name = serviceConf.Name + } + + reply.Index = index + reply.Definition = definition + return nil + }) +} + +// verifyConfigReadACL checks whether the given ACL authorizer has permission +// to read the config entry of the given kind/name. +func verifyConfigReadACL(rule acl.Authorizer, kind, name string) error { + if rule == nil { + return nil + } + + switch kind { + case structs.ServiceDefaults: + if !rule.ServiceRead(name) { + return acl.ErrPermissionDenied + } + case structs.ProxyDefaults: + if !rule.OperatorRead() { + return acl.ErrPermissionDenied + } + default: + return fmt.Errorf("unknown config entry type %q", kind) + } + + return nil +} + +// verifyConfigWriteACL checks whether the given ACL authorizer has permission +// to update the config entry of the given kind/name. +func verifyConfigWriteACL(rule acl.Authorizer, kind, name string) error { + if rule == nil { + return nil + } + + switch kind { + case structs.ServiceDefaults: + if !rule.ServiceWrite(name, nil) { + return acl.ErrPermissionDenied + } + case structs.ProxyDefaults: + if !rule.OperatorWrite() { + return acl.ErrPermissionDenied + } + default: + return fmt.Errorf("unknown config entry type %q", kind) + } + + return nil +} diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go new file mode 100644 index 000000000..0b19c98a8 --- /dev/null +++ b/agent/consul/config_endpoint_test.go @@ -0,0 +1,619 @@ +package consul + +import ( + "os" + "testing" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/require" +) + +func TestConfigEntry_Apply(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Name: "foo", + }, + } + var out struct{} + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + + state := s1.fsm.State() + _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + + serviceConf, ok := entry.(*structs.ServiceConfigEntry) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) +} + +func TestConfigEntry_Apply_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "write" +} +operator = "write" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // This should fail since we don't have write perms for the "db" service. + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Name: "db", + }, + WriteRequest: structs.WriteRequest{Token: id}, + } + var out struct{} + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // The "foo" service should work. + args.Entry = &structs.ServiceConfigEntry{ + Name: "foo", + } + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out) + require.NoError(err) + + state := s1.fsm.State() + _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + + serviceConf, ok := entry.(*structs.ServiceConfigEntry) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) + + // Try to update the global proxy args with the anonymous token - this should fail. + proxyArgs := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + } + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // Now with the privileged token. + proxyArgs.WriteRequest.Token = id + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &proxyArgs, &out) + require.NoError(err) +} + +func TestConfigEntry_Get(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Create a dummy service in the state store to look up. + entry := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + } + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, entry)) + + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "foo", + Datacenter: s1.config.Datacenter, + } + var out structs.IndexedConfigEntries + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)) + + serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) +} + +func TestConfigEntry_Get_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "read" +} +operator = "read" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a dummy service in the state store to look up. + // Create some dummy service/proxy configs to be looked up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + + // This should fail since we don't have write perms for the "db" service. + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "db", + Datacenter: s1.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: id}, + } + var out structs.IndexedConfigEntries + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // The "foo" service should work. + args.Name = "foo" + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)) + + serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) + + // Try to look up the proxy config with no token. + args.Kind = structs.ProxyDefaults + args.Name = structs.ProxyConfigGlobal + args.QueryOptions.Token = "" + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + args.QueryOptions.Token = id + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)) +} + +func TestConfigEntry_List(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Create some dummy services in the state store to look up. + state := s1.fsm.State() + expected := structs.IndexedConfigEntries{ + Entries: []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + }, + } + require.NoError(state.EnsureConfigEntry(1, expected.Entries[0])) + require.NoError(state.EnsureConfigEntry(2, expected.Entries[1])) + + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: "dc1", + } + var out structs.IndexedConfigEntries + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out)) + + expected.QueryMeta = out.QueryMeta + require.Equal(expected, out) +} + +func TestConfigEntry_List_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "read" +} +operator = "read" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Create some dummy service/proxy configs to be looked up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "db", + })) + + // This should filter out the "db" service since we don't have permissions for it. + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: s1.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: id}, + } + var out structs.IndexedConfigEntries + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out) + require.NoError(err) + + serviceConf, ok := out.Entries[0].(*structs.ServiceConfigEntry) + require.Len(out.Entries, 1) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) + + // Get the global proxy config. + args.Kind = structs.ProxyDefaults + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.List", &args, &out) + require.NoError(err) + + proxyConf, ok := out.Entries[0].(*structs.ProxyConfigEntry) + require.Len(out.Entries, 1) + require.True(ok) + require.Equal(structs.ProxyConfigGlobal, proxyConf.Name) + require.Equal(structs.ProxyDefaults, proxyConf.Kind) +} + +func TestConfigEntry_Delete(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Create a dummy service in the state store to look up. + entry := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + } + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, entry)) + + // Verify it's there. + _, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + + serviceConf, ok := existing.(*structs.ServiceConfigEntry) + require.True(ok) + require.Equal("foo", serviceConf.Name) + require.Equal(structs.ServiceDefaults, serviceConf.Kind) + + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + } + args.Entry = entry + var out struct{} + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)) + + // Verify the entry was deleted. + _, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + require.Nil(existing) +} + +func TestConfigEntry_Delete_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "write" +} +operator = "write" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Create some dummy service/proxy configs to be looked up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + + // This should fail since we don't have write perms for the "db" service. + args := structs.ConfigEntryRequest{ + Datacenter: s1.config.Datacenter, + Entry: &structs.ServiceConfigEntry{ + Name: "db", + }, + WriteRequest: structs.WriteRequest{Token: id}, + } + var out struct{} + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // The "foo" service should work. + args.Entry = &structs.ServiceConfigEntry{ + Name: "foo", + } + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)) + + // Verify the entry was deleted. + _, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + require.Nil(existing) + + // Try to delete the global proxy config without a token. + args = structs.ConfigEntryRequest{ + Datacenter: s1.config.Datacenter, + Entry: &structs.ProxyConfigEntry{ + Name: structs.ProxyConfigGlobal, + }, + } + err = msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // Now delete with a valid token. + args.WriteRequest.Token = id + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)) + + _, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(err) + require.Nil(existing) +} + +func TestConfigEntry_ResolveServiceConfig(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Create a dummy proxy/service config in the state store to look up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "foo": "bar", + }, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + + args := structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: s1.config.Datacenter, + } + var out structs.ServiceConfigResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) + + expected := structs.ServiceDefinition{ + Name: "foo", + Proxy: &structs.ConnectProxyConfig{ + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + } + out.Definition.Proxy.Config["foo"] = structs.Uint8ToString(out.Definition.Proxy.Config["foo"].([]uint8)) + require.Equal(expected, out.Definition) +} + +func TestConfigEntry_ResolveServiceConfig_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "write" +} +operator = "write" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Create some dummy service/proxy configs to be looked up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "db", + })) + + // This should fail since we don't have write perms for the "db" service. + args := structs.ServiceConfigRequest{ + Name: "db", + Datacenter: s1.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: id}, + } + var out struct{} + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // The "foo" service should work. + args.Name = "foo" + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) + +} diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index d4e1998f1..3c5d2f4d1 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1390,7 +1390,7 @@ func TestFSM_ConfigEntry(t *testing.T) { // Verify it's in the state store. { - _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") + _, config, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) entry.RaftIndex.CreateIndex = 1 entry.RaftIndex.ModifyIndex = 1 diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index fe1f3db95..b8b5bb132 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -403,11 +403,11 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { assert.Equal(caConfig, caConf) // Verify config entries are restored - _, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo") + _, serviceConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceDefaults, "foo") require.NoError(err) assert.Equal(serviceConfig, serviceConfEntry) - _, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global") + _, proxyConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) assert.Equal(proxyConfig, proxyConfEntry) diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index 516cf39c5..1627d4059 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -4,6 +4,7 @@ func init() { registerEndpoint(func(s *Server) interface{} { return &ACL{s} }) registerEndpoint(func(s *Server) interface{} { return &Catalog{s} }) registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) }) + registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} }) registerEndpoint(func(s *Server) interface{} { return &Health{s} }) registerEndpoint(func(s *Server) interface{} { return &Intention{s} }) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index d4bdaea31..82eb80509 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -37,7 +37,7 @@ func configTableSchema() *memdb.TableSchema { "kind": &memdb.IndexSchema{ Name: "kind", AllowMissing: false, - Unique: true, + Unique: false, Indexer: &memdb.StringFieldIndex{ Field: "Kind", Lowercase: true, @@ -80,7 +80,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { } // ConfigEntry is called to get a given config entry. -func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { +func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, structs.ConfigEntry, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -88,13 +88,14 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err idx := maxIndexTxn(tx, configTableName) // Get the existing config entry. - existing, err := tx.First(configTableName, "id", kind, name) + watchCh, existing, err := tx.FirstWatch(configTableName, "id", kind, name) if err != nil { return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } if existing == nil { return idx, nil, nil } + ws.Add(watchCh) conf, ok := existing.(structs.ConfigEntry) if !ok { @@ -105,13 +106,13 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err } // ConfigEntries is called to get all config entry objects. -func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { - return s.ConfigEntriesByKind("") +func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) { + return s.ConfigEntriesByKind(nil, "") } // ConfigEntriesByKind is called to get all config entry objects with the given kind. // If kind is empty, all config entries will be returned. -func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { +func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string) (uint64, []structs.ConfigEntry, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -129,6 +130,7 @@ func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, if err != nil { return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) } + ws.Add(iter.WatchCh()) var results []structs.ConfigEntry for v := iter.Next(); v != nil; v = iter.Next() { @@ -218,7 +220,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error { tx := s.db.Txn(true) defer tx.Abort() - // Try to retrieve the existing health check. + // Try to retrieve the existing config entry. existing, err := tx.First(configTableName, "id", kind, name) if err != nil { return fmt.Errorf("failed config entry lookup: %s", err) diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index bf74cf699..7824c8152 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -22,7 +23,7 @@ func TestStore_ConfigEntry(t *testing.T) { // Create require.NoError(s.EnsureConfigEntry(0, expected)) - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(0), idx) require.Equal(expected, config) @@ -37,7 +38,7 @@ func TestStore_ConfigEntry(t *testing.T) { } require.NoError(s.EnsureConfigEntry(1, updated)) - idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(1), idx) require.Equal(updated, config) @@ -45,10 +46,30 @@ func TestStore_ConfigEntry(t *testing.T) { // Delete require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) - idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(2), idx) require.Nil(config) + + // Set up a watch. + serviceConf := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + } + require.NoError(s.EnsureConfigEntry(3, serviceConf)) + + ws := memdb.NewWatchSet() + _, _, err = s.ConfigEntry(ws, structs.ServiceDefaults, "foo") + require.NoError(err) + + // Make an unrelated modification and make sure the watch doesn't fire. + require.NoError(s.EnsureConfigEntry(4, updated)) + require.False(watchFired(ws)) + + // Update the watched config and make sure it fires. + serviceConf.Protocol = "http" + require.NoError(s.EnsureConfigEntry(5, serviceConf)) + require.True(watchFired(ws)) } func TestStore_ConfigEntryCAS(t *testing.T) { @@ -66,7 +87,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) { // Create require.NoError(s.EnsureConfigEntry(1, expected)) - idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err := s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(1), idx) require.Equal(expected, config) @@ -84,7 +105,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) { require.NoError(err) // Entry should not be changed - idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(1), idx) require.Equal(expected, config) @@ -95,7 +116,7 @@ func TestStore_ConfigEntryCAS(t *testing.T) { require.NoError(err) // Entry should be updated - idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") + idx, config, err = s.ConfigEntry(nil, structs.ProxyDefaults, "global") require.NoError(err) require.Equal(uint64(2), idx) require.Equal(updated, config) @@ -114,25 +135,42 @@ func TestStore_ConfigEntries(t *testing.T) { Kind: structs.ServiceDefaults, Name: "test2", } + entry3 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "test3", + } require.NoError(s.EnsureConfigEntry(0, entry1)) require.NoError(s.EnsureConfigEntry(1, entry2)) + require.NoError(s.EnsureConfigEntry(2, entry3)) // Get all entries - idx, entries, err := s.ConfigEntries() + idx, entries, err := s.ConfigEntries(nil) require.NoError(err) - require.Equal(uint64(1), idx) - require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) + require.Equal(uint64(2), idx) + require.Equal([]structs.ConfigEntry{entry1, entry2, entry3}, entries) // Get all proxy entries - idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) + idx, entries, err = s.ConfigEntriesByKind(nil, structs.ProxyDefaults) require.NoError(err) - require.Equal(uint64(1), idx) + require.Equal(uint64(2), idx) require.Equal([]structs.ConfigEntry{entry1}, entries) // Get all service entries - idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) + ws := memdb.NewWatchSet() + idx, entries, err = s.ConfigEntriesByKind(ws, structs.ServiceDefaults) require.NoError(err) - require.Equal(uint64(1), idx) - require.Equal([]structs.ConfigEntry{entry2}, entries) + require.Equal(uint64(2), idx) + require.Equal([]structs.ConfigEntry{entry2, entry3}, entries) + + // Watch should not have fired + require.False(watchFired(ws)) + + // Now make an update and make sure the watch fires. + require.NoError(s.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "test2", + Protocol: "tcp", + })) + require.True(watchFired(ws)) } diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 76f508873..f89154b86 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -31,11 +31,10 @@ type ConfigEntry interface { // ServiceConfiguration is the top-level struct for the configuration of a service // across the entire cluster. type ServiceConfigEntry struct { - Kind string - Name string - Protocol string - Connect ConnectConfiguration - ServiceDefinitionDefaults ServiceDefinitionDefaults + Kind string + Name string + Protocol string + Connect ConnectConfiguration RaftIndex } @@ -83,26 +82,6 @@ type ConnectConfiguration struct { SidecarProxy bool } -type ServiceDefinitionDefaults struct { - EnableTagOverride bool - - // Non script/docker checks only - Check *HealthCheck - Checks HealthChecks - - // Kind is allowed to accommodate non-sidecar proxies but it will be an error - // if they also set Connect.DestinationServiceID since sidecars are - // configured via their associated service's config. - Kind ServiceKind - - // Only DestinationServiceName and Config are supported. - Proxy ConnectProxyConfig - - Connect ServiceConnect - - Weights Weights -} - // ProxyConfigEntry is the top-level struct for global proxy configuration defaults. type ProxyConfigEntry struct { Kind string @@ -130,6 +109,7 @@ func (e *ProxyConfigEntry) Normalize() error { } e.Kind = ProxyDefaults + e.Name = ProxyConfigGlobal return nil } @@ -161,18 +141,26 @@ const ( ConfigEntryDelete ConfigEntryOp = "delete" ) +// ConfigEntryRequest is used when creating/updating/deleting a ConfigEntry. type ConfigEntryRequest struct { - Op ConfigEntryOp - Entry ConfigEntry + Op ConfigEntryOp + Datacenter string + Entry ConfigEntry + + WriteRequest } -func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { +func (c *ConfigEntryRequest) RequestDatacenter() string { + return c.Datacenter +} + +func (c *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { // bs will grow if needed but allocate enough to avoid reallocation in common // case. bs := make([]byte, 128) enc := codec.NewEncoderBytes(&bs, msgpackHandle) // Encode kind first - err = enc.Encode(r.Entry.GetKind()) + err = enc.Encode(c.Entry.GetKind()) if err != nil { return nil, err } @@ -181,7 +169,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { err = enc.Encode(struct { *Alias }{ - Alias: (*Alias)(r), + Alias: (*Alias)(c), }) if err != nil { return nil, err @@ -189,7 +177,7 @@ func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { return bs, nil } -func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { +func (c *ConfigEntryRequest) UnmarshalBinary(data []byte) error { // First decode the kind prefix var kind string dec := codec.NewDecoderBytes(data, msgpackHandle) @@ -202,7 +190,7 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { if err != nil { return err } - r.Entry = entry + c.Entry = entry // Alias juggling to prevent infinite recursive calls back to this decode // method. @@ -210,7 +198,7 @@ func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { as := struct { *Alias }{ - Alias: (*Alias)(r), + Alias: (*Alias)(c), } if err := dec.Decode(&as); err != nil { return err @@ -228,3 +216,35 @@ func makeConfigEntry(kind string) (ConfigEntry, error) { return nil, fmt.Errorf("invalid config entry kind: %s", kind) } } + +// ConfigEntryQuery is used when requesting info about a config entry. +type ConfigEntryQuery struct { + Kind string + Name string + Datacenter string + + QueryOptions +} + +func (c *ConfigEntryQuery) RequestDatacenter() string { + return c.Datacenter +} + +// ServiceConfigRequest is used when requesting the resolved configuration +// for a service. +type ServiceConfigRequest struct { + Name string + Datacenter string + + QueryOptions +} + +func (s *ServiceConfigRequest) RequestDatacenter() string { + return s.Datacenter +} + +type ServiceConfigResponse struct { + Definition ServiceDefinition + + QueryMeta +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 56d86ab52..ba55da377 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1140,6 +1140,82 @@ type IndexedNodeDump struct { QueryMeta } +// IndexedConfigEntries has its own encoding logic which differs from +// ConfigEntryRequest as it has to send a slice of ConfigEntry. +type IndexedConfigEntries struct { + Entries []ConfigEntry + QueryMeta +} + +func (c *IndexedConfigEntries) MarshalBinary() (data []byte, err error) { + // bs will grow if needed but allocate enough to avoid reallocation in common + // case. + bs := make([]byte, 128) + enc := codec.NewEncoderBytes(&bs, msgpackHandle) + + // Encode kinds of entries first + err = enc.Encode(len(c.Entries)) + if err != nil { + return nil, err + } + for _, entry := range c.Entries { + err = enc.Encode(entry.GetKind()) + if err != nil { + return nil, err + } + } + + // Then actual value using alias trick to avoid infinite recursion + type Alias IndexedConfigEntries + err = enc.Encode(struct { + *Alias + }{ + Alias: (*Alias)(c), + }) + if err != nil { + return nil, err + } + return bs, nil +} + +func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error { + // First decode the number of entries + var numEntries int + dec := codec.NewDecoderBytes(data, msgpackHandle) + if err := dec.Decode(&numEntries); err != nil { + return err + } + + c.Entries = make([]ConfigEntry, numEntries) + for i := 0; i < numEntries; i++ { + // First decode the kind prefix + var kind string + if err := dec.Decode(&kind); err != nil { + return err + } + + // Then decode the real thing with appropriate kind of ConfigEntry + entry, err := makeConfigEntry(kind) + if err != nil { + return err + } + c.Entries[i] = entry + } + + // Alias juggling to prevent infinite recursive calls back to this decode + // method. + type Alias IndexedConfigEntries + as := struct { + *Alias + }{ + Alias: (*Alias)(c), + } + if err := dec.Decode(&as); err != nil { + return err + } + return nil +} + // DirEntry is used to represent a directory entry. This is // used for values in our Key-Value store. type DirEntry struct {