Finish cleanup from ServiceConfigRequest changes
This commit is contained in:
parent
a67c92b961
commit
940b7a98d1
|
@ -1948,7 +1948,6 @@ type addServiceLockedRequest struct {
|
||||||
// agent using Agent.AddService.
|
// agent using Agent.AddService.
|
||||||
type AddServiceRequest struct {
|
type AddServiceRequest struct {
|
||||||
Service *structs.NodeService
|
Service *structs.NodeService
|
||||||
nodeName string
|
|
||||||
chkTypes []*structs.CheckType
|
chkTypes []*structs.CheckType
|
||||||
persist bool
|
persist bool
|
||||||
token string
|
token string
|
||||||
|
@ -3108,7 +3107,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
err = a.addServiceLocked(addServiceLockedRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
AddServiceRequest: AddServiceRequest{
|
AddServiceRequest: AddServiceRequest{
|
||||||
Service: ns,
|
Service: ns,
|
||||||
nodeName: a.config.NodeName,
|
|
||||||
chkTypes: chkTypes,
|
chkTypes: chkTypes,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
token: service.Token,
|
token: service.Token,
|
||||||
|
@ -3129,7 +3127,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
err = a.addServiceLocked(addServiceLockedRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
AddServiceRequest: AddServiceRequest{
|
AddServiceRequest: AddServiceRequest{
|
||||||
Service: sidecar,
|
Service: sidecar,
|
||||||
nodeName: a.config.NodeName,
|
|
||||||
chkTypes: sidecarChecks,
|
chkTypes: sidecarChecks,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
token: sidecarToken,
|
token: sidecarToken,
|
||||||
|
@ -3228,7 +3225,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
||||||
err = a.addServiceLocked(addServiceLockedRequest{
|
err = a.addServiceLocked(addServiceLockedRequest{
|
||||||
AddServiceRequest: AddServiceRequest{
|
AddServiceRequest: AddServiceRequest{
|
||||||
Service: p.Service,
|
Service: p.Service,
|
||||||
nodeName: a.config.NodeName,
|
|
||||||
chkTypes: nil,
|
chkTypes: nil,
|
||||||
persist: false, // don't rewrite the file with the same data we just read
|
persist: false, // don't rewrite the file with the same data we just read
|
||||||
token: p.Token,
|
token: p.Token,
|
||||||
|
|
|
@ -994,7 +994,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
||||||
|
|
||||||
addReq := AddServiceRequest{
|
addReq := AddServiceRequest{
|
||||||
Service: ns,
|
Service: ns,
|
||||||
nodeName: s.agent.config.NodeName,
|
|
||||||
chkTypes: chkTypes,
|
chkTypes: chkTypes,
|
||||||
persist: true,
|
persist: true,
|
||||||
token: token,
|
token: token,
|
||||||
|
@ -1008,7 +1007,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
||||||
if sidecar != nil {
|
if sidecar != nil {
|
||||||
addReq := AddServiceRequest{
|
addReq := AddServiceRequest{
|
||||||
Service: sidecar,
|
Service: sidecar,
|
||||||
nodeName: s.agent.config.NodeName,
|
|
||||||
chkTypes: sidecarChecks,
|
chkTypes: sidecarChecks,
|
||||||
persist: true,
|
persist: true,
|
||||||
token: sidecarToken,
|
token: sidecarToken,
|
||||||
|
|
|
@ -25,8 +25,6 @@ func TestResolvedServiceConfig(t *testing.T) {
|
||||||
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
|
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
|
||||||
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
|
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||||
require.Equal("foo", req.Name)
|
require.Equal("foo", req.Name)
|
||||||
require.Equal("foo-1", req.ID)
|
|
||||||
require.Equal("foo-node", req.NodeName)
|
|
||||||
require.True(req.AllowStale)
|
require.True(req.AllowStale)
|
||||||
|
|
||||||
reply := args.Get(2).(*structs.ServiceConfigResponse)
|
reply := args.Get(2).(*structs.ServiceConfigResponse)
|
||||||
|
@ -50,8 +48,6 @@ func TestResolvedServiceConfig(t *testing.T) {
|
||||||
}, &structs.ServiceConfigRequest{
|
}, &structs.ServiceConfigRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: "foo-1",
|
|
||||||
NodeName: "foo-node",
|
|
||||||
})
|
})
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(cache.FetchResult{
|
require.Equal(cache.FetchResult{
|
||||||
|
|
|
@ -333,7 +333,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
|
|
||||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||||
// We use the default enterprise meta to look up the global proxy defaults because their are not namespaced.
|
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||||
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
|
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1145,269 +1145,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigEntry_ResolveServiceConfig_Upstreams_RegistrationBlocking(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
dir1, s1 := testServer(t)
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
nodeName := "foo-node"
|
|
||||||
|
|
||||||
// Create a dummy proxy/service config in the state store to look up.
|
|
||||||
state := s1.fsm.State()
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
|
||||||
Kind: structs.ProxyDefaults,
|
|
||||||
Name: structs.ProxyConfigGlobal,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"foo": 1,
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
|
||||||
Kind: structs.ServiceDefaults,
|
|
||||||
Name: "foo",
|
|
||||||
Protocol: "http",
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
|
||||||
Kind: structs.ServiceDefaults,
|
|
||||||
Name: "bar",
|
|
||||||
Protocol: "grpc",
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureNode(4, &structs.Node{
|
|
||||||
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
|
|
||||||
Node: nodeName,
|
|
||||||
}))
|
|
||||||
|
|
||||||
args := structs.ServiceConfigRequest{
|
|
||||||
Name: "foo",
|
|
||||||
Datacenter: s1.config.Datacenter,
|
|
||||||
Upstreams: []string{"bar", "baz"},
|
|
||||||
}
|
|
||||||
var out structs.ServiceConfigResponse
|
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
|
||||||
|
|
||||||
var index uint64
|
|
||||||
expected := structs.ServiceConfigResponse{
|
|
||||||
ProxyConfig: map[string]interface{}{
|
|
||||||
"foo": int64(1),
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
// This mesh gateway configuration is pulled from foo-proxy's registration
|
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
|
||||||
"bar": {
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Don't know what this is deterministically
|
|
||||||
QueryMeta: out.QueryMeta,
|
|
||||||
}
|
|
||||||
require.Equal(t, expected, out)
|
|
||||||
index = out.Index
|
|
||||||
|
|
||||||
// Now setup a blocking query for 'foo' while we add the proxy registration for foo-proxy.
|
|
||||||
// Adding the foo proxy registration should cause the blocking query to fire because it is
|
|
||||||
// watched when the ID and NodeName are provided.
|
|
||||||
{
|
|
||||||
// Async cause a change
|
|
||||||
start := time.Now()
|
|
||||||
go func() {
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
require.NoError(t, state.EnsureService(index+1, nodeName, &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeLocal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Re-run the query
|
|
||||||
var out structs.ServiceConfigResponse
|
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
|
||||||
&structs.ServiceConfigRequest{
|
|
||||||
Name: "foo",
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Upstreams: []string{"bar", "baz"},
|
|
||||||
QueryOptions: structs.QueryOptions{
|
|
||||||
MinQueryIndex: index,
|
|
||||||
MaxQueryTime: time.Second,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&out,
|
|
||||||
))
|
|
||||||
|
|
||||||
// Should block at least 100ms
|
|
||||||
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
|
|
||||||
|
|
||||||
// Check the indexes
|
|
||||||
require.Equal(t, out.Index, index+1)
|
|
||||||
|
|
||||||
// The mesh gateway config from the proxy registration should no longer be present
|
|
||||||
expected := structs.ServiceConfigResponse{
|
|
||||||
ProxyConfig: map[string]interface{}{
|
|
||||||
"foo": int64(1),
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
|
||||||
"bar": {
|
|
||||||
"protocol": "grpc",
|
|
||||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
|
||||||
},
|
|
||||||
"baz": {
|
|
||||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Don't know what this is deterministically
|
|
||||||
QueryMeta: out.QueryMeta,
|
|
||||||
}
|
|
||||||
require.Equal(t, expected, out)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConfigEntry_ResolveServiceConfig_Upstreams_DegistrationBlocking(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
dir1, s1 := testServer(t)
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
nodeName := "foo-node"
|
|
||||||
|
|
||||||
// Create a dummy proxy/service config in the state store to look up.
|
|
||||||
state := s1.fsm.State()
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
|
||||||
Kind: structs.ProxyDefaults,
|
|
||||||
Name: structs.ProxyConfigGlobal,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"foo": 1,
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
|
||||||
Kind: structs.ServiceDefaults,
|
|
||||||
Name: "foo",
|
|
||||||
Protocol: "http",
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
|
||||||
Kind: structs.ServiceDefaults,
|
|
||||||
Name: "bar",
|
|
||||||
Protocol: "grpc",
|
|
||||||
}))
|
|
||||||
require.NoError(t, state.EnsureNode(4, &structs.Node{
|
|
||||||
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
|
|
||||||
Node: nodeName,
|
|
||||||
}))
|
|
||||||
|
|
||||||
registration := structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeLocal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
require.NoError(t, state.EnsureService(5, nodeName, ®istration))
|
|
||||||
|
|
||||||
args := structs.ServiceConfigRequest{
|
|
||||||
Name: "foo",
|
|
||||||
Datacenter: s1.config.Datacenter,
|
|
||||||
MeshGateway: registration.Proxy.MeshGateway,
|
|
||||||
Upstreams: []string{"bar", "baz"},
|
|
||||||
}
|
|
||||||
var out structs.ServiceConfigResponse
|
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
|
||||||
|
|
||||||
var index uint64
|
|
||||||
expected := structs.ServiceConfigResponse{
|
|
||||||
ProxyConfig: map[string]interface{}{
|
|
||||||
"foo": int64(1),
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
// This mesh gateway configuration is pulled from foo-proxy's registration
|
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
|
||||||
"bar": {
|
|
||||||
"protocol": "grpc",
|
|
||||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
|
||||||
},
|
|
||||||
"baz": {
|
|
||||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Don't know what this is deterministically
|
|
||||||
QueryMeta: out.QueryMeta,
|
|
||||||
}
|
|
||||||
require.Equal(t, expected, out)
|
|
||||||
index = out.Index
|
|
||||||
|
|
||||||
// Now setup a blocking query for 'foo' while we erase the proxy registration for foo-proxy.
|
|
||||||
// Deleting the foo proxy registration should cause the blocking query to fire because it is
|
|
||||||
// watched when the ID and NodeName are provided.
|
|
||||||
{
|
|
||||||
// Async cause a change
|
|
||||||
start := time.Now()
|
|
||||||
go func() {
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
require.NoError(t, state.DeleteService(index+1, nodeName, "foo-proxy", nil))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Re-run the query
|
|
||||||
var out structs.ServiceConfigResponse
|
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
|
||||||
&structs.ServiceConfigRequest{
|
|
||||||
Name: "foo",
|
|
||||||
Datacenter: "dc1",
|
|
||||||
MeshGateway: registration.Proxy.MeshGateway,
|
|
||||||
Upstreams: []string{"bar", "baz"},
|
|
||||||
QueryOptions: structs.QueryOptions{
|
|
||||||
MinQueryIndex: index,
|
|
||||||
MaxQueryTime: time.Second,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&out,
|
|
||||||
))
|
|
||||||
|
|
||||||
// Should block at least 100ms
|
|
||||||
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
|
|
||||||
|
|
||||||
// Check the indexes
|
|
||||||
require.Equal(t, out.Index, index+1)
|
|
||||||
|
|
||||||
// The mesh gateway config from the proxy registration should no longer be present
|
|
||||||
expected := structs.ServiceConfigResponse{
|
|
||||||
ProxyConfig: map[string]interface{}{
|
|
||||||
"foo": int64(1),
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
|
||||||
"bar": {
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Don't know what this is deterministically
|
|
||||||
QueryMeta: out.QueryMeta,
|
|
||||||
}
|
|
||||||
require.Equal(t, expected, out)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
|
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -1136,24 +1136,6 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
|
||||||
return idx, service, nil
|
return idx, service, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServiceWatch is used to retrieve a specific service associated with the given
|
|
||||||
// node, and add it to the watch set.
|
|
||||||
func (s *Store) NodeServiceWatch(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.NodeService, error) {
|
|
||||||
tx := s.db.Txn(false)
|
|
||||||
defer tx.Abort()
|
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := catalogServicesMaxIndex(tx, entMeta)
|
|
||||||
|
|
||||||
// Query the service
|
|
||||||
service, err := getNodeServiceWatchTxn(tx, ws, nodeName, serviceID, entMeta)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return idx, service, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
||||||
// Query the service
|
// Query the service
|
||||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
||||||
|
@ -1168,21 +1150,6 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNodeServiceWatchTxn(tx ReadTxn, ws memdb.WatchSet, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
|
||||||
// Query the service
|
|
||||||
watchCh, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
|
||||||
}
|
|
||||||
ws.Add(watchCh)
|
|
||||||
|
|
||||||
if service != nil {
|
|
||||||
return service.(*structs.ServiceNode).ToNodeService(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) {
|
func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
Loading…
Reference in New Issue