From 3deccdde5669f42e5710b3b605b9b0895f400b3b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Sep 2020 17:11:51 -0400 Subject: [PATCH] subscribe: add integration test for filtering events by acl --- agent/subscribe/subscribe_test.go | 369 +++++++++++------------------- 1 file changed, 128 insertions(+), 241 deletions(-) diff --git a/agent/subscribe/subscribe_test.go b/agent/subscribe/subscribe_test.go index 4b660c402..bacd94253 100644 --- a/agent/subscribe/subscribe_test.go +++ b/agent/subscribe/subscribe_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" @@ -29,7 +30,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) srv := &Server{Backend: backend, Logger: hclog.New(nil)} - addr := newTestServer(t, srv) ids := newCounter() @@ -273,12 +273,12 @@ func assertDeepEqual(t *testing.T, x, y interface{}) { type testBackend struct { store *state.Store - authorizer acl.Authorizer + authorizer func(token string) acl.Authorizer forwardConn *gogrpc.ClientConn } -func (b testBackend) ResolveToken(_ string) (acl.Authorizer, error) { - return b.authorizer, nil +func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) { + return b.authorizer(token), nil } func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) { @@ -301,7 +301,10 @@ func newTestBackend() (*testBackend, error) { if err != nil { return nil, err } - return &testBackend{store: store, authorizer: acl.AllowAll()}, nil + allowAll := func(_ string) acl.Authorizer { + return acl.AllowAll() + } + return &testBackend{store: store, authorizer: allowAll}, nil } var _ Backend = (*testBackend)(nil) @@ -395,7 +398,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { Port: 9000, }, } - require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("req1"), req)) + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req)) } { req := &structs.RegisterRequest{ @@ -577,236 +580,128 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { assertDeepEqual(t, expectedEvent, event) } -/* TODO -func TestStreaming_Subscribe_SkipSnapshot(t *testing.T) { - t.Parallel() +// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events - require := require.New(t) - dir1, server := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = true - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir1) - defer server.Shutdown() - codec := rpcClient(t, server) - defer codec.Close() - - dir2, client := testClientWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.NodeName = uniqueNodeName(t.Name()) - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir2) - defer client.Shutdown() - - // Try to join - testrpc.WaitForLeader(t, server.RPC, "dc1") - joinLAN(t, client, server) - testrpc.WaitForTestAgent(t, client.RPC, "dc1") - - // Register a dummy node with our service on it. - { - req := &structs.RegisterRequest{ - Node: "node1", - Address: "3.4.5.6", - Datacenter: "dc1", - Service: &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "3.4.5.6", - Port: 8080, - }, - } - var out struct{} - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)) +func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) { + if testing.Short() { + t.Skip("too slow for -short run") } - // Start a Subscribe call to our streaming endpoint. - conn, err := client.GRPCConn() - require.NoError(err) - - streamClient := pbsubscribe.NewConsulClient(conn) - - var index uint64 - { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}) - require.NoError(err) - - // Start a goroutine to read updates off the pbsubscribe. - eventCh := make(chan *pbsubscribe.Event, 0) - go recvEvents(t, eventCh, streamHandle) - - var snapshotEvents []*pbsubscribe.Event - for i := 0; i < 2; i++ { - select { - case event := <-eventCh: - snapshotEvents = append(snapshotEvents, event) - case <-time.After(3 * time.Second): - t.Fatalf("did not receive events past %d", len(snapshotEvents)) - } - } - - // Save the index from the event - index = snapshotEvents[0].Index - } - - // Start another Subscribe call passing the index from the last event. - { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: index, - }) - require.NoError(err) - - // Start a goroutine to read updates off the pbsubscribe. - eventCh := make(chan *pbsubscribe.Event, 0) - go recvEvents(t, eventCh, streamHandle) - - // We should get no snapshot and the first event should be "resume stream" - select { - case event := <-eventCh: - require.True(event.GetResumeStream()) - case <-time.After(500 * time.Millisecond): - t.Fatalf("never got event") - } - - // Wait and make sure there aren't any events coming. The server shouldn't send - // a snapshot and we haven't made any updates to the catalog that would trigger - // more events. - select { - case event := <-eventCh: - t.Fatalf("got another event: %v", event) - case <-time.After(500 * time.Millisecond): - } - } -} - -func TestStreaming_Subscribe_FilterACL(t *testing.T) { - t.Parallel() - - require := require.New(t) - dir, _, server, codec := testACLFilterServerWithConfigFn(t, func(c *Config) { - c.ACLDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLMasterToken = "root" - c.ACLDefaultPolicy = "deny" - c.ACLEnforceVersion8 = true - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir) - defer server.Shutdown() - defer codec.Close() - - dir2, client := testClientWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.NodeName = uniqueNodeName(t.Name()) - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir2) - defer client.Shutdown() - - // Try to join - testrpc.WaitForLeader(t, server.RPC, "dc1") - joinLAN(t, client, server) - testrpc.WaitForTestAgent(t, client.RPC, "dc1", testrpc.WithToken("root")) + backend, err := newTestBackend() + require.NoError(t, err) + srv := &Server{Backend: backend, Logger: hclog.New(nil)} + addr := newTestServer(t, srv) // Create a policy for the test token. - policyReq := structs.ACLPolicySetRequest{ - Datacenter: "dc1", - Policy: structs.ACLPolicy{ - Description: "foobar", - Name: "baz", - Rules: fmt.Sprintf(` - service "foo" { - policy = "write" - } - node "%s" { - policy = "write" - } - `, server.config.NodeName), - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - resp := structs.ACLPolicy{} - require.NoError(msgpackrpc.CallWithCodec(codec, "ACL.PolicySet", &policyReq, &resp)) + rules := ` +service "foo" { + policy = "write" +} +node "node1" { + policy = "write" +} +` + authorizer, err := acl.NewAuthorizerFromRules( + "1", 0, rules, acl.SyntaxCurrent, + &acl.Config{WildcardName: structs.WildcardSpecifier}, + nil) + require.NoError(t, err) + authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) + require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) - // Create a new token that only has access to one node. - var token structs.ACLToken - arg := structs.ACLTokenSetRequest{ - Datacenter: "dc1", - ACLToken: structs.ACLToken{ - Policies: []structs.ACLTokenPolicyLink{ - structs.ACLTokenPolicyLink{ - ID: resp.ID, - }, + // TODO: is there any easy way to do this with the acl package? + token := "this-token-is-good" + backend.authorizer = func(tok string) acl.Authorizer { + if tok == token { + return authorizer + } + return acl.DenyAll() + } + + ids := newCounter() + { + req := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "node1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "foo", + Service: "foo", }, - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - require.NoError(msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &arg, &token)) - auth, err := server.ResolveToken(token.SecretID) - require.NoError(err) - require.Equal(auth.NodeRead("denied", nil), acl.Deny) + Check: &structs.HealthCheck{ + CheckID: "service:foo", + Name: "service:foo", + Node: "node1", + ServiceID: "foo", + Status: api.HealthPassing, + }, + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg1"), req)) - // Register another instance of service foo on a fake node the token doesn't have access to. - regArg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "denied", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "foo", - Service: "foo", - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil)) + // Register a service which should be denied + req = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "node1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "bar", + Service: "bar", + }, + Check: &structs.HealthCheck{ + CheckID: "service:bar", + Name: "service:bar", + Node: "node1", + ServiceID: "bar", + }, + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req)) - // Set up the gRPC client. - conn, err := client.GRPCConn() - require.NoError(err) - streamClient := pbsubscribe.NewConsulClient(conn) + req = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "denied", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "foo", + Service: "foo", + }, + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(logError(t, conn.Close)) + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) // Start a Subscribe call to our streaming endpoint for the service we have access to. { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "foo", - Token: token.SecretID, + Token: token, }) - require.NoError(err) + require.NoError(t, err) - // Start a goroutine to read updates off the pbsubscribe. - eventCh := make(chan *pbsubscribe.Event, 0) - go recvEvents(t, eventCh, streamHandle) + chEvents := make(chan eventOrError, 0) + go recvEvents(chEvents, streamHandle) - // Read events off the pbsubscribe. We should not see any events for the filtered node. var snapshotEvents []*pbsubscribe.Event for i := 0; i < 2; i++ { - select { - case event := <-eventCh: - snapshotEvents = append(snapshotEvents, event) - case <-time.After(5 * time.Second): - t.Fatalf("did not receive events past %d", len(snapshotEvents)) - } + snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) } - require.Len(snapshotEvents, 2) - require.Equal("foo", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Service.Service) - require.Equal(server.config.NodeName, snapshotEvents[0].GetServiceHealth().CheckServiceNode.Node.Node) - require.True(snapshotEvents[1].GetEndOfSnapshot()) + + require.Len(t, snapshotEvents, 2) + require.Equal(t, "foo", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Service.Service) + require.Equal(t, "node1", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Node.Node) + require.True(t, snapshotEvents[1].GetEndOfSnapshot()) // Update the service with a new port to trigger a new event. - regArg := structs.RegisterRequest{ + req := &structs.RegisterRequest{ Datacenter: "dc1", - Node: server.config.NodeName, + Node: "node1", Address: "127.0.0.1", Service: &structs.NodeService{ ID: "foo", @@ -818,22 +713,19 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) { Name: "service:foo", ServiceID: "foo", Status: api.HealthPassing, + Node: "node1", }, WriteRequest: structs.WriteRequest{Token: "root"}, } - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil)) + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req)) - select { - case event := <-eventCh: - service := event.GetServiceHealth().CheckServiceNode.Service - require.Equal("foo", service.Service) - require.Equal(1234, service.Port) - case <-time.After(5 * time.Second): - t.Fatalf("did not receive events past %d", len(snapshotEvents)) - } + event := getEvent(t, chEvents) + service := event.GetServiceHealth().CheckServiceNode.Service + require.Equal(t, "foo", service.Service) + require.Equal(t, int32(1234), service.Port) // Now update the service on the denied node and make sure we don't see an event. - regArg = structs.RegisterRequest{ + req = &structs.RegisterRequest{ Datacenter: "dc1", Node: "denied", Address: "127.0.0.1", @@ -847,13 +739,14 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) { Name: "service:foo", ServiceID: "foo", Status: api.HealthPassing, + Node: "denied", }, WriteRequest: structs.WriteRequest{Token: "root"}, } - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil)) + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req)) select { - case event := <-eventCh: + case event := <-chEvents: t.Fatalf("should not have received event: %v", event) case <-time.After(500 * time.Millisecond): } @@ -861,30 +754,22 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) { // Start another subscribe call for bar, which the token shouldn't have access to. { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "bar", - Token: token.SecretID, + Token: token, }) - require.NoError(err) + require.NoError(t, err) - // Start a goroutine to read updates off the pbsubscribe. - eventCh := make(chan *pbsubscribe.Event, 0) - go recvEvents(t, eventCh, streamHandle) + chEvents := make(chan eventOrError, 0) + go recvEvents(chEvents, streamHandle) - select { - case event := <-eventCh: - require.True(event.GetEndOfSnapshot()) - case <-time.After(3 * time.Second): - t.Fatal("did not receive event") - } + require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) // Update the service and make sure we don't get a new event. - regArg := structs.RegisterRequest{ + req := &structs.RegisterRequest{ Datacenter: "dc1", - Node: server.config.NodeName, + Node: "node1", Address: "127.0.0.1", Service: &structs.NodeService{ ID: "bar", @@ -895,19 +780,21 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) { CheckID: "service:bar", Name: "service:bar", ServiceID: "bar", + Node: "node1", }, WriteRequest: structs.WriteRequest{Token: "root"}, } - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", ®Arg, nil)) + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req)) select { - case event := <-eventCh: + case event := <-chEvents: t.Fatalf("should not have received event: %v", event) case <-time.After(500 * time.Millisecond): } } } +/* func TestStreaming_Subscribe_ACLUpdate(t *testing.T) { t.Parallel()