WatchRoots gRPC endpoint (#12678)

Adds a new gRPC streaming endpoint (WatchRoots) that dataplane clients will
use to fetch the current list of active Connect CA roots and receive new
lists whenever the roots are rotated.
This commit is contained in:
Dan Upton 2022-04-05 15:26:14 +01:00 committed by GitHub
parent b4285b56ee
commit e48c1611ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1473 additions and 216 deletions

3
.changelog/12678.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
ca: Root certificates can now be consumed from a gRPC streaming endpoint: `WatchRoots`
```

View File

@ -664,6 +664,26 @@ func (r *ACLResolver) synthesizePoliciesForNodeIdentities(nodeIdentities []*stru
return syntheticPolicies return syntheticPolicies
} }
// plainACLResolver wraps ACLResolver so that it can be used in other packages
// that cannot import agent/consul wholesale (e.g. because of import cycles).
//
// TODO(agentless): this pattern was copied from subscribeBackend for expediency
// but we should really refactor ACLResolver so it can be passed as a dependency
// to other packages.
type plainACLResolver struct {
resolver *ACLResolver
}
func (r plainACLResolver) ResolveTokenAndDefaultMeta(
token string,
entMeta *structs.EnterpriseMeta,
authzContext *acl.AuthorizerContext,
) (acl.Authorizer, error) {
// ACLResolver.ResolveTokenAndDefaultMeta returns a ACLResolveResult which
// can't be used in other packages, but it embeds acl.Authorizer which can.
return r.resolver.ResolveTokenAndDefaultMeta(token, entMeta, authzContext)
}
func dedupeServiceIdentities(in []*structs.ACLServiceIdentity) []*structs.ACLServiceIdentity { func dedupeServiceIdentities(in []*structs.ACLServiceIdentity) []*structs.ACLServiceIdentity {
// From: https://github.com/golang/go/wiki/SliceTricks#in-place-deduplicate-comparable // From: https://github.com/golang/go/wiki/SliceTricks#in-place-deduplicate-comparable

View File

@ -19,6 +19,7 @@ import (
vaultapi "github.com/hashicorp/vault/api" vaultapi "github.com/hashicorp/vault/api"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul-net-rpc/net/rpc"
@ -550,7 +551,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1) deps := newDefaultDeps(t, conf1)
deps.Logger = logger deps.Logger = logger
s1, err := NewServer(conf1, deps, nil) s1, err := NewServer(conf1, deps, grpc.NewServer())
require.NoError(t, err) require.NoError(t, err)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
@ -1528,7 +1529,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config) deps := newDefaultDeps(t, config)
deps.Logger = logger deps.Logger = logger
srv, err := NewServer(config, deps, nil) srv, err := NewServer(config, deps, grpc.NewServer())
require.NoError(t, err) require.NoError(t, err)
defer srv.Shutdown() defer srv.Shutdown()

View File

@ -43,6 +43,7 @@ import (
"github.com/hashicorp/consul/agent/consul/wanfed" "github.com/hashicorp/consul/agent/consul/wanfed"
agentgrpc "github.com/hashicorp/consul/agent/grpc/private" agentgrpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe" "github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
"github.com/hashicorp/consul/agent/grpc/public/services/connectca"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
@ -632,6 +633,13 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
// since it can fire events when leadership is obtained. // since it can fire events when leadership is obtained.
go s.monitorLeadership() go s.monitorLeadership()
// Initialize public gRPC server.
connectca.NewServer(connectca.Config{
GetStore: func() connectca.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.connect-ca"),
ACLResolver: plainACLResolver{s.ACLResolver},
}).Register(s.publicGRPCServer)
// Start listening for RPC requests. // Start listening for RPC requests.
go func() { go func() {
if err := s.grpcHandler.Run(); err != nil { if err := s.grpcHandler.Run(); err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/google/tcpproxy" "github.com/google/tcpproxy"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
@ -263,7 +264,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
} }
} }
srv, err := NewServer(c, newDefaultDeps(t, c), nil) srv, err := NewServer(c, newDefaultDeps(t, c), grpc.NewServer())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package state package state
import ( import (
"fmt"
"strings" "strings"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
@ -11,6 +12,38 @@ import (
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
) )
// EventSubjectService is a stream.Subject used to route and receive events for
// a specific service.
type EventSubjectService struct {
Key string
EnterpriseMeta structs.EnterpriseMeta
overrideKey string
overrideNamespace string
overridePartition string
}
// String satisfies the stream.Subject interface.
func (s EventSubjectService) String() string {
partition := s.EnterpriseMeta.PartitionOrDefault()
if v := s.overridePartition; v != "" {
partition = strings.ToLower(v)
}
namespace := s.EnterpriseMeta.NamespaceOrDefault()
if v := s.overrideNamespace; v != "" {
namespace = strings.ToLower(v)
}
key := s.Key
if v := s.overrideKey; v != "" {
key = v
}
key = strings.ToLower(key)
return partition + "/" + namespace + "/" + key
}
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
// indicates changes to a CheckServiceNode for service health. // indicates changes to a CheckServiceNode for service health.
// //
@ -33,25 +66,14 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo
} }
func (e EventPayloadCheckServiceNode) Subject() stream.Subject { func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
partition := e.Value.Service.PartitionOrDefault() return EventSubjectService{
if e.overridePartition != "" { Key: e.Value.Service.Service,
partition = e.overridePartition EnterpriseMeta: e.Value.Service.EnterpriseMeta,
}
partition = strings.ToLower(partition)
namespace := e.Value.Service.NamespaceOrDefault() overrideKey: e.overrideKey,
if e.overrideNamespace != "" { overrideNamespace: e.overrideNamespace,
namespace = e.overrideNamespace overridePartition: e.overridePartition,
} }
namespace = strings.ToLower(namespace)
key := e.Value.Service.Service
if e.overrideKey != "" {
key = e.overrideKey
}
key = strings.ToLower(key)
return stream.Subject(partition + "/" + namespace + "/" + key)
} }
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -62,7 +84,13 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
defer tx.Abort() defer tx.Abort()
connect := topic == topicServiceHealthConnect connect := topic == topicServiceHealthConnect
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta)
subject, ok := req.Subject.(EventSubjectService)
if !ok {
return 0, fmt.Errorf("expected SubscribeRequest.Subject to be a: state.EventSubjectService, was a: %T", req.Subject)
}
idx, nodes, err := checkServiceNodesTxn(tx, nil, subject.Key, connect, &subject.EnterpriseMeta)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -16,11 +16,10 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { func TestEventPayloadCheckServiceNode_Subject(t *testing.T) {
// Matches.
for desc, tc := range map[string]struct { for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode evt EventPayloadCheckServiceNode
req stream.SubscribeRequest sub string
}{ }{
"default partition and namespace": { "default partition and namespace": {
EventPayloadCheckServiceNode{ EventPayloadCheckServiceNode{
@ -30,10 +29,7 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
}, },
}, },
}, },
stream.SubscribeRequest{ "default/default/foo",
Key: "foo",
EnterpriseMeta: structs.EnterpriseMeta{},
},
}, },
"mixed casing": { "mixed casing": {
EventPayloadCheckServiceNode{ EventPayloadCheckServiceNode{
@ -43,7 +39,7 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
}, },
}, },
}, },
stream.SubscribeRequest{Key: "foo"}, "default/default/foo",
}, },
"override key": { "override key": {
EventPayloadCheckServiceNode{ EventPayloadCheckServiceNode{
@ -54,60 +50,11 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
}, },
overrideKey: "bar", overrideKey: "bar",
}, },
stream.SubscribeRequest{Key: "bar"}, "default/default/bar",
}, },
} { } {
t.Run(desc, func(t *testing.T) { t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.req.Subject(), tc.evt.Subject()) require.Equal(t, tc.sub, tc.evt.Subject().String())
})
}
// Non-matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"different key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "bar",
},
},
"different partition": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overridePartition: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
"different namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideNamespace: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
} {
t.Run(desc, func(t *testing.T) {
require.NotEqual(t, tc.req.Subject(), tc.evt.Subject())
}) })
} }
} }
@ -125,7 +72,7 @@ func TestServiceHealthSnapshot(t *testing.T) {
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth) fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth)
buf := &snapshotAppender{} buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web"} req := stream.SubscribeRequest{Subject: EventSubjectService{Key: "web"}}
idx, err := fn(req, buf) idx, err := fn(req, buf)
require.NoError(t, err) require.NoError(t, err)
@ -202,7 +149,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect) fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
buf := &snapshotAppender{} buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect} req := stream.SubscribeRequest{Subject: EventSubjectService{Key: "web"}, Topic: topicServiceHealthConnect}
idx, err := fn(req, buf) idx, err := fn(req, buf)
require.NoError(t, err) require.NoError(t, err)

View File

@ -12,11 +12,13 @@ import (
// //
// Note: topics are ordinarily defined in subscribe.proto, but this one isn't // Note: topics are ordinarily defined in subscribe.proto, but this one isn't
// currently available via the Subscribe endpoint. // currently available via the Subscribe endpoint.
const EventTopicCARoots stringTopic = "CARoots" const EventTopicCARoots stringer = "CARoots"
type stringTopic string // stringer is a convenience type to turn a regular string into a fmt.Stringer
// so that it can be used as a stream.Topic or stream.Subject.
type stringer string
func (s stringTopic) String() string { return string(s) } func (s stringer) String() string { return string(s) }
type EventPayloadCARoots struct { type EventPayloadCARoots struct {
CARoots structs.CARoots CARoots structs.CARoots
@ -25,9 +27,12 @@ type EventPayloadCARoots struct {
func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone } func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone }
func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool { func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool {
// TODO(agentless): implement this method once the Authorizer exposes a method // Require `service:write` on any service in any partition and namespace.
// to check for `service:write` on any service. var authzContext acl.AuthorizerContext
panic("EventPayloadCARoots does not implement HasReadPermission") structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier).
FillAuthzContext(&authzContext)
return authz.ServiceWriteAny(&authzContext) == acl.Allow
} }
// caRootsChangeEvents returns an event on EventTopicCARoots whenever the list // caRootsChangeEvents returns an event on EventTopicCARoots whenever the list

View File

@ -5,6 +5,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -93,3 +94,25 @@ func TestCARootsSnapshot(t *testing.T) {
}) })
}) })
} }
func TestEventPayloadCARoots_HasReadPermission(t *testing.T) {
t.Run("no service:write", func(t *testing.T) {
hasRead := EventPayloadCARoots{}.HasReadPermission(acl.DenyAll())
require.False(t, hasRead)
})
t.Run("has service:write", func(t *testing.T) {
policy, err := acl.NewPolicyFromSource(`
service "foo" {
policy = "write"
}
`, acl.SyntaxCurrent, nil, nil)
require.NoError(t, err)
authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
require.NoError(t, err)
hasRead := EventPayloadCARoots{}.HasReadPermission(authz)
require.True(t, hasRead)
})
}

View File

@ -276,8 +276,11 @@ func (s *Store) AbandonCh() <-chan struct{} {
// Abandon is used to signal that the given state store has been abandoned. // Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic. // Calling this more than one time will panic.
func (s *Store) Abandon() { func (s *Store) Abandon() {
s.stopEventPublisher() // Note: the order of these operations matters. Subscribers may receive on
// abandonCh to determine whether their subscription was closed because the
// store was abandoned, therefore it's important abandonCh is closed first.
close(s.abandonCh) close(s.abandonCh)
s.stopEventPublisher()
} }
// maxIndex is a helper used to retrieve the highest known index // maxIndex is a helper used to retrieve the highest known index

View File

@ -25,9 +25,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Register the subscription. // Register the subscription.
subscription := &stream.SubscribeRequest{ subscription := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -71,9 +71,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Register another subscription. // Register another subscription.
subscription2 := &stream.SubscribeRequest{ subscription2 := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
sub2, err := publisher.Subscribe(subscription2) sub2, err := publisher.Subscribe(subscription2)
require.NoError(t, err) require.NoError(t, err)
@ -112,9 +112,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register the subscription. // Register the subscription.
subscription := &stream.SubscribeRequest{ subscription := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -162,9 +162,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription. // Register another subscription.
subscription2 := &stream.SubscribeRequest{ subscription2 := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
sub, err = publisher.Subscribe(subscription2) sub, err = publisher.Subscribe(subscription2)
require.NoError(t, err) require.NoError(t, err)
@ -191,9 +191,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription. // Register another subscription.
subscription3 := &stream.SubscribeRequest{ subscription3 := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
sub, err = publisher.Subscribe(subscription3) sub, err = publisher.Subscribe(subscription3)
require.NoError(t, err) require.NoError(t, err)
@ -233,9 +233,9 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
// Register the subscription. // Register the subscription.
subscription := &stream.SubscribeRequest{ subscription := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -278,9 +278,9 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
// Register another subscription. // Register another subscription.
subscription2 := &stream.SubscribeRequest{ subscription2 := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
sub, err = publisher.Subscribe(subscription2) sub, err = publisher.Subscribe(subscription2)
require.NoError(t, err) require.NoError(t, err)
@ -396,7 +396,9 @@ var topicService topic = "test-topic-service"
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{
topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) key := req.Subject.String()
idx, nodes, err := s.ServiceNodes(nil, key, nil)
if err != nil { if err != nil {
return idx, err return idx, err
} }
@ -405,7 +407,7 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
event := stream.Event{ event := stream.Event{
Topic: req.Topic, Topic: req.Topic,
Index: node.ModifyIndex, Index: node.ModifyIndex,
Payload: nodePayload{node: node, key: req.Key}, Payload: nodePayload{node: node, key: key},
} }
snap.Append([]stream.Event{event}) snap.Append([]stream.Event{event})
} }
@ -424,7 +426,7 @@ func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
} }
func (p nodePayload) Subject() stream.Subject { func (p nodePayload) Subject() stream.Subject {
return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key) return stringer(p.key)
} }
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
@ -451,9 +453,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
// so we know the initial token write event has been sent out before // so we know the initial token write event has been sent out before
// continuing... // continuing...
req := &stream.SubscribeRequest{ req := &stream.SubscribeRequest{
Topic: topicService, Topic: topicService,
Key: "nope", Subject: stringer("nope"),
Token: token.SecretID, Token: token.SecretID,
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()

View File

@ -17,12 +17,16 @@ type Topic fmt.Stringer
// Subject identifies a portion of a topic for which a subscriber wishes to // Subject identifies a portion of a topic for which a subscriber wishes to
// receive events (e.g. health events for a particular service) usually the // receive events (e.g. health events for a particular service) usually the
// normalized resource name (including partition and namespace if applicable). // normalized resource name (including partition and namespace if applicable).
type Subject string type Subject fmt.Stringer
// SubjectNone is used when all events on a given topic are "global" and not // SubjectNone is used when all events on a given topic are "global" and not
// further partitioned by subject. For example: the "CA Roots" topic which is // further partitioned by subject. For example: the "CA Roots" topic which is
// used to notify subscribers when the global set CA root certificates changes. // used to notify subscribers when the global set CA root certificates changes.
const SubjectNone Subject = "none" const SubjectNone stringer = "none"
type stringer string
func (s stringer) String() string { return string(s) }
// Event is a structure with identifiers and a payload. Events are Published to // Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers. // EventPublisher and returned to Subscribers.
@ -123,6 +127,12 @@ func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == newSnapshotToFollow{} return e.Payload == newSnapshotToFollow{}
} }
// IsFramingEvent returns true if this is a framing event (e.g. EndOfSnapshot
// or NewSnapshotToFollow).
func (e Event) IsFramingEvent() bool {
return e.IsEndOfSnapshot() || e.IsNewSnapshotToFollow()
}
type framingEvent struct{} type framingEvent struct{}
func (framingEvent) HasReadPermission(acl.Authorizer) bool { func (framingEvent) HasReadPermission(acl.Authorizer) bool {

View File

@ -44,8 +44,8 @@ type EventPublisher struct {
// topicSubject is used as a map key when accessing topic buffers and cached // topicSubject is used as a map key when accessing topic buffers and cached
// snapshots. // snapshots.
type topicSubject struct { type topicSubject struct {
Topic Topic Topic string
Subject Subject Subject string
} }
type subscriptions struct { type subscriptions struct {
@ -138,7 +138,10 @@ func (e *EventPublisher) publishEvent(events []Event) {
continue continue
} }
groupKey := topicSubject{event.Topic, event.Payload.Subject()} groupKey := topicSubject{
Topic: event.Topic.String(),
Subject: event.Payload.Subject().String(),
}
groupedEvents[groupKey] = append(groupedEvents[groupKey], event) groupedEvents[groupKey] = append(groupedEvents[groupKey], event)
} }

View File

@ -21,8 +21,8 @@ var testTopic Topic = intTopic(999)
func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -81,7 +81,7 @@ func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
return !p.noReadPerm return !p.noReadPerm
} }
func (p simplePayload) Subject() Subject { return Subject("default/default/" + p.key) } func (p simplePayload) Subject() Subject { return stringer(p.key) }
func newTestSnapshotHandlers() SnapshotHandlers { func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{ return SnapshotHandlers{
@ -153,11 +153,11 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
publisher := NewEventPublisher(handlers, time.Second) publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx) go publisher.Run(ctx)
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22), Subject: SubjectNone})
require.NoError(t, err) require.NoError(t, err)
defer sub1.Unsubscribe() defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33)}) sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33), Subject: SubjectNone})
require.NoError(t, err) require.NoError(t, err)
defer sub2.Unsubscribe() defer sub2.Unsubscribe()
@ -184,8 +184,8 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -229,8 +229,8 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -282,8 +282,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -338,8 +338,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -406,9 +406,9 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
Index: 1, Index: 1,
} }
nextEvent := Event{ nextEvent := Event{
@ -492,8 +492,8 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
@ -514,8 +514,8 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) { func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) {
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "sub-key", Subject: stringer("sub-key"),
} }
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)

View File

@ -4,10 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync/atomic" "sync/atomic"
"github.com/hashicorp/consul/agent/structs"
) )
const ( const (
@ -54,37 +51,32 @@ type Subscription struct {
} }
// SubscribeRequest identifies the types of events the subscriber would like to // SubscribeRequest identifies the types of events the subscriber would like to
// receiver. Topic and Token are required. // receive. Topic, Subject, and Token are required.
type SubscribeRequest struct { type SubscribeRequest struct {
// Topic to subscribe to // Topic to subscribe to (e.g. service health).
Topic Topic Topic Topic
// Key used to filter events in the topic. Only events matching the key will
// be returned by the subscription. A blank key will return all events. Key // Subject identifies the subset of Topic events the subscriber wishes to
// is generally the name of the resource. // receive (e.g. events for a specific service). SubjectNone may be provided
Key string // if all events on the given topic are "global" and not further partitioned
// EnterpriseMeta is used to filter events in the topic. Only events matching // by subject.
// the partition and namespace will be returned by the subscription. Subject Subject
EnterpriseMeta structs.EnterpriseMeta
// Token that was used to authenticate the request. If any ACL policy // Token that was used to authenticate the request. If any ACL policy
// changes impact the token the subscription will be forcefully closed. // changes impact the token the subscription will be forcefully closed.
Token string Token string
// Index is the last index the client received. If non-zero the // Index is the last index the client received. If non-zero the
// subscription will be resumed from this index. If the index is out-of-date // subscription will be resumed from this index. If the index is out-of-date
// a NewSnapshotToFollow event will be sent. // a NewSnapshotToFollow event will be sent.
Index uint64 Index uint64
} }
func (req SubscribeRequest) Subject() Subject {
var (
partition = req.EnterpriseMeta.PartitionOrDefault()
namespace = req.EnterpriseMeta.NamespaceOrDefault()
key = strings.ToLower(req.Key)
)
return Subject(partition + "/" + namespace + "/" + key)
}
func (req SubscribeRequest) topicSubject() topicSubject { func (req SubscribeRequest) topicSubject() topicSubject {
return topicSubject{req.Topic, req.Subject()} return topicSubject{
Topic: req.Topic.String(),
Subject: req.Subject.String(),
}
} }
// newSubscription return a new subscription. The caller is responsible for // newSubscription return a new subscription. The caller is responsible for

View File

@ -6,32 +6,10 @@ import (
time "time" time "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
) )
func noopUnSub() {} func noopUnSub() {}
func TestSubscription_Subject(t *testing.T) {
for desc, tc := range map[string]struct {
req SubscribeRequest
sub Subject
}{
"default partition and namespace": {
SubscribeRequest{Key: "foo", EnterpriseMeta: structs.EnterpriseMeta{}},
"default/default/foo",
},
"mixed casing": {
SubscribeRequest{Key: "BaZ"},
"default/default/baz",
},
} {
t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.sub, tc.req.Subject())
})
}
}
func TestSubscription(t *testing.T) { func TestSubscription(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
@ -50,8 +28,8 @@ func TestSubscription(t *testing.T) {
defer cancel() defer cancel()
req := SubscribeRequest{ req := SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Subject: stringer("test"),
} }
sub := newSubscription(req, startHead, noopUnSub) sub := newSubscription(req, startHead, noopUnSub)
@ -124,8 +102,8 @@ func TestSubscription_Close(t *testing.T) {
defer cancel() defer cancel()
req := SubscribeRequest{ req := SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Subject: stringer("test"),
} }
sub := newSubscription(req, startHead, noopUnSub) sub := newSubscription(req, startHead, noopUnSub)

View File

@ -93,11 +93,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest { func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
return &stream.SubscribeRequest{ return &stream.SubscribeRequest{
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Subject: state.EventSubjectService{
EnterpriseMeta: entMeta, Key: req.Key,
Token: req.Token, EnterpriseMeta: entMeta,
Index: req.Index, },
Token: req.Token,
Index: req.Index,
} }
} }

View File

@ -3,13 +3,12 @@ package subscribe
import ( import (
"context" "context"
"errors" "errors"
"github.com/golang/protobuf/ptypes/duration"
"github.com/hashicorp/consul/proto/pbcommon"
"io" "io"
"net" "net"
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/ptypes/duration"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -25,6 +24,7 @@ import (
grpc "github.com/hashicorp/consul/agent/grpc/private" grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
@ -1106,7 +1106,7 @@ func newEventFromSubscription(t *testing.T, index uint64) stream.Event {
}, },
} }
ep := stream.NewEventPublisher(handlers, 0) ep := stream.NewEventPublisher(handlers, 0)
req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index} req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Subject: stream.SubjectNone, Index: index}
sub, err := ep.Subscribe(req) sub, err := ep.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)

View File

@ -0,0 +1,27 @@
package connectca
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
)
// testAuthorizer returns an ACL policy authorizer with `service:write` on an
// arbitrary service.
func testAuthorizer(t *testing.T) acl.Authorizer {
t.Helper()
policy, err := acl.NewPolicyFromSource(`
service "foo" {
policy = "write"
}
`, acl.SyntaxCurrent, nil, nil)
require.NoError(t, err)
authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
require.NoError(t, err)
return authz
}

View File

@ -0,0 +1,38 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package connectca
import (
acl "github.com/hashicorp/consul/acl"
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
type MockACLResolver struct {
mock.Mock
}
// ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *structs.EnterpriseMeta, _a2 *acl.AuthorizerContext) (acl.Authorizer, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 acl.Authorizer
if rf, ok := ret.Get(0).(func(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) acl.Authorizer); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(acl.Authorizer)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -0,0 +1,42 @@
package connectca
import (
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto-public/pbconnectca"
)
type Server struct {
Config
}
type Config struct {
GetStore func() StateStore
Logger hclog.Logger
ACLResolver ACLResolver
}
type StateStore interface {
EventPublisher() state.EventPublisher
CAConfig(memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
AbandonCh() <-chan struct{}
}
//go:generate mockery -name ACLResolver -inpkg
type ACLResolver interface {
ResolveTokenAndDefaultMeta(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) (acl.Authorizer, error)
}
func NewServer(cfg Config) *Server {
return &Server{cfg}
}
func (s *Server) Register(grpcServer *grpc.Server) {
pbconnectca.RegisterConnectCAServiceServer(grpcServer, s)
}

View File

@ -0,0 +1,52 @@
package connectca
import (
"context"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/proto-public/pbconnectca"
)
func testStateStore(t *testing.T) *state.Store {
t.Helper()
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err)
return state.NewStateStoreWithEventPublisher(gc)
}
func testClient(t *testing.T, server *Server) pbconnectca.ConnectCAServiceClient {
t.Helper()
addr := runTestServer(t, server)
conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, conn.Close())
})
return pbconnectca.NewConnectCAServiceClient(conn)
}
func runTestServer(t *testing.T, server *Server) net.Addr {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
grpcServer := grpc.NewServer()
server.Register(grpcServer)
go grpcServer.Serve(lis)
t.Cleanup(grpcServer.Stop)
return lis.Addr()
}

View File

@ -0,0 +1,202 @@
package connectca
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc/public"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto-public/pbconnectca"
)
// WatchRoots provides a stream on which you can receive the list of active
// Connect CA roots. Current roots are sent immediately at the start of the
// stream, and new lists will be sent whenever the roots are rotated.
func (s *Server) WatchRoots(_ *emptypb.Empty, serverStream pbconnectca.ConnectCAService_WatchRootsServer) error {
logger := s.Logger.Named("watch-roots").With("stream_id", streamID())
logger.Trace("starting stream")
defer logger.Trace("stream closed")
token := public.TokenFromContext(serverStream.Context())
// Serve the roots from an EventPublisher subscription. If the subscription is
// closed due to an ACL change, we'll attempt to re-authorize and resume it to
// prevent unnecessarily terminating the stream.
var idx uint64
for {
var err error
idx, err = s.serveRoots(token, idx, serverStream, logger)
if errors.Is(err, stream.ErrSubForceClosed) {
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume")
} else {
return err
}
}
}
func (s *Server) serveRoots(
token string,
idx uint64,
serverStream pbconnectca.ConnectCAService_WatchRootsServer,
logger hclog.Logger,
) (uint64, error) {
if err := s.authorize(token); err != nil {
return 0, err
}
store := s.GetStore()
// Read the TrustDomain up front - we do not allow users to change the ClusterID
// so reading it once at the beginning of the stream is sufficient.
trustDomain, err := getTrustDomain(store, logger)
if err != nil {
return 0, err
}
// Start the subscription.
sub, err := store.EventPublisher().Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicCARoots,
Subject: stream.SubjectNone,
Token: token,
Index: idx,
})
if err != nil {
logger.Error("failed to subscribe to CA Roots events", "error", err)
return 0, status.Error(codes.Internal, "failed to subscribe to CA Roots events")
}
defer sub.Unsubscribe()
for {
event, err := sub.Next(serverStream.Context())
switch {
case errors.Is(err, stream.ErrSubForceClosed):
// If the subscription was closed because the state store was abandoned (e.g.
// following a snapshot restore) reset idx to ensure we don't skip over the
// new store's events.
select {
case <-store.AbandonCh():
idx = 0
default:
}
return idx, err
case errors.Is(err, context.Canceled):
return 0, nil
case err != nil:
logger.Error("failed to read next event", "error", err)
return idx, status.Error(codes.Internal, err.Error())
}
// Note: this check isn't strictly necessary because the event publishing
// machinery will ensure the index increases monotonically, but it can be
// tricky to faithfully reproduce this in tests (e.g. the EventPublisher
// garbage collects topic buffers and snapshots aggressively when streams
// disconnect) so this avoids a bunch of confusing setup code.
if event.Index <= idx {
continue
}
idx = event.Index
// We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow)
// because we send a full list of roots on every event, rather than expecting
// clients to maintain a state-machine in the way they do for service health.
if event.IsFramingEvent() {
continue
}
rsp, err := eventToResponse(event, trustDomain)
if err != nil {
logger.Error("failed to convert event to response", "error", err)
return idx, status.Error(codes.Internal, err.Error())
}
if err := serverStream.Send(rsp); err != nil {
logger.Error("failed to send response", "error", err)
return idx, err
}
}
}
func eventToResponse(event stream.Event, trustDomain string) (*pbconnectca.WatchRootsResponse, error) {
payload, ok := event.Payload.(state.EventPayloadCARoots)
if !ok {
return nil, fmt.Errorf("unexpected event payload type: %T", payload)
}
var active string
roots := make([]*pbconnectca.CARoot, 0)
for _, root := range payload.CARoots {
if root.Active {
active = root.ID
}
roots = append(roots, &pbconnectca.CARoot{
Id: root.ID,
Name: root.Name,
SerialNumber: root.SerialNumber,
SigningKeyId: root.SigningKeyID,
RootCert: root.RootCert,
IntermediateCerts: root.IntermediateCerts,
Active: root.Active,
RotatedOutAt: timestamppb.New(root.RotatedOutAt),
})
}
return &pbconnectca.WatchRootsResponse{
TrustDomain: trustDomain,
ActiveRootId: active,
Roots: roots,
}, nil
}
func (s *Server) authorize(token string) error {
// Require the given ACL token to have `service:write` on any service (in any
// partition and namespace).
var authzContext acl.AuthorizerContext
entMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier)
authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, entMeta, &authzContext)
if err != nil {
return status.Error(codes.Unauthenticated, err.Error())
}
if err := authz.ToAllowAuthorizer().ServiceWriteAnyAllowed(&authzContext); err != nil {
return status.Error(codes.PermissionDenied, err.Error())
}
return nil
}
// We tag logs with a unique identifier to ease debugging. In the future this
// should probably be an Open Telemetry trace ID.
func streamID() string {
id, err := uuid.GenerateUUID()
if err != nil {
return ""
}
return id
}
func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
_, cfg, err := store.CAConfig(nil)
switch {
case err != nil:
logger.Error("failed to read Connect CA Config", "error", err)
return "", status.Error(codes.Internal, "failed to read Connect CA Config")
case cfg == nil:
logger.Warn("cannot begin stream because Connect CA is not yet initialized")
return "", status.Error(codes.FailedPrecondition, "Connect CA is not yet initialized")
}
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
}

View File

@ -0,0 +1,280 @@
package connectca
import (
"context"
"errors"
"io"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/grpc/public"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto-public/pbconnectca"
)
const testACLToken = "acl-token"
func TestWatchRoots_Success(t *testing.T) {
store := testStateStore(t)
// Set the initial roots and CA configuration.
rootA := connect.TestCA(t, nil)
_, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA})
require.NoError(t, err)
err = store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"})
require.NoError(t, err)
// Mock the ACL Resolver to return an authorizer with `service:write`.
aclResolver := &MockACLResolver{}
aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything).
Return(testAuthorizer(t), nil)
ctx := public.ContextWithToken(context.Background(), testACLToken)
server := NewServer(Config{
GetStore: func() StateStore { return store },
Logger: hclog.NewNullLogger(),
ACLResolver: aclResolver,
})
// Begin the stream.
client := testClient(t, server)
stream, err := client.WatchRoots(ctx, &emptypb.Empty{})
require.NoError(t, err)
rspCh := handleRootsStream(t, stream)
// Expect an initial message containing current roots (provided by the snapshot).
roots := mustGetRoots(t, rspCh)
require.Equal(t, "cluster-id.consul", roots.TrustDomain)
require.Equal(t, rootA.ID, roots.ActiveRootId)
require.Len(t, roots.Roots, 1)
require.Equal(t, rootA.ID, roots.Roots[0].Id)
// Rotate the roots.
rootB := connect.TestCA(t, nil)
_, err = store.CARootSetCAS(2, 1, structs.CARoots{rootB})
require.NoError(t, err)
// Expect another event containing the new roots.
roots = mustGetRoots(t, rspCh)
require.Equal(t, "cluster-id.consul", roots.TrustDomain)
require.Equal(t, rootB.ID, roots.ActiveRootId)
require.Len(t, roots.Roots, 1)
require.Equal(t, rootB.ID, roots.Roots[0].Id)
}
func TestWatchRoots_InvalidACLToken(t *testing.T) {
store := testStateStore(t)
// Set the initial CA configuration.
err := store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"})
require.NoError(t, err)
// Mock the ACL resolver to return ErrNotFound.
aclResolver := &MockACLResolver{}
aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(nil, acl.ErrNotFound)
ctx := public.ContextWithToken(context.Background(), testACLToken)
server := NewServer(Config{
GetStore: func() StateStore { return store },
Logger: hclog.NewNullLogger(),
ACLResolver: aclResolver,
})
// Start the stream.
client := testClient(t, server)
stream, err := client.WatchRoots(ctx, &emptypb.Empty{})
require.NoError(t, err)
rspCh := handleRootsStream(t, stream)
// Expect to get an Unauthenticated error immediately.
err = mustGetError(t, rspCh)
require.Equal(t, codes.Unauthenticated.String(), status.Code(err).String())
}
func TestWatchRoots_ACLTokenInvalidated(t *testing.T) {
store := testStateStore(t)
// Set the initial roots and CA configuration.
rootA := connect.TestCA(t, nil)
_, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA})
require.NoError(t, err)
err = store.CASetConfig(2, &structs.CAConfiguration{ClusterID: "cluster-id"})
require.NoError(t, err)
// Mock the ACL Resolver to return an authorizer with `service:write` the
// first two times it is called (initial connect and first re-auth).
aclResolver := &MockACLResolver{}
aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything).
Return(testAuthorizer(t), nil).Twice()
ctx := public.ContextWithToken(context.Background(), testACLToken)
server := NewServer(Config{
GetStore: func() StateStore { return store },
Logger: hclog.NewNullLogger(),
ACLResolver: aclResolver,
})
// Start the stream.
client := testClient(t, server)
stream, err := client.WatchRoots(ctx, &emptypb.Empty{})
require.NoError(t, err)
rspCh := handleRootsStream(t, stream)
// Consume the initial response.
mustGetRoots(t, rspCh)
// Update the ACL token to cause the subscription to be force-closed.
accessorID, err := uuid.GenerateUUID()
require.NoError(t, err)
err = store.ACLTokenSet(1, &structs.ACLToken{
AccessorID: accessorID,
SecretID: testACLToken,
})
require.NoError(t, err)
// Update the roots.
rootB := connect.TestCA(t, nil)
_, err = store.CARootSetCAS(3, 1, structs.CARoots{rootB})
require.NoError(t, err)
// Expect the stream to remain open and to receive the new roots.
mustGetRoots(t, rspCh)
// Simulate removing the `service:write` permission.
aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything).
Return(acl.DenyAll(), nil)
// Update the ACL token to cause the subscription to be force-closed.
err = store.ACLTokenSet(1, &structs.ACLToken{
AccessorID: accessorID,
SecretID: testACLToken,
})
require.NoError(t, err)
// Expect the stream to be terminated.
err = mustGetError(t, rspCh)
require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String())
}
func TestWatchRoots_StateStoreAbandoned(t *testing.T) {
storeA := testStateStore(t)
// Set the initial roots and CA configuration.
rootA := connect.TestCA(t, nil)
_, err := storeA.CARootSetCAS(1, 0, structs.CARoots{rootA})
require.NoError(t, err)
err = storeA.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-a"})
require.NoError(t, err)
// Mock the ACL Resolver to return an authorizer with `service:write`.
aclResolver := &MockACLResolver{}
aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything).
Return(testAuthorizer(t), nil)
ctx := public.ContextWithToken(context.Background(), testACLToken)
server := NewServer(Config{
GetStore: func() StateStore { return storeA },
Logger: hclog.NewNullLogger(),
ACLResolver: aclResolver,
})
// Begin the stream.
client := testClient(t, server)
stream, err := client.WatchRoots(ctx, &emptypb.Empty{})
require.NoError(t, err)
rspCh := handleRootsStream(t, stream)
// Consume the initial roots.
mustGetRoots(t, rspCh)
// Simulate a snapshot restore.
storeB := testStateStore(t)
rootB := connect.TestCA(t, nil)
_, err = storeB.CARootSetCAS(1, 0, structs.CARoots{rootB})
require.NoError(t, err)
err = storeB.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-b"})
require.NoError(t, err)
server.GetStore = func() StateStore { return storeB }
storeA.Abandon()
// Expect to get the new store's roots.
newRoots := mustGetRoots(t, rspCh)
require.Equal(t, "cluster-b.consul", newRoots.TrustDomain)
require.Len(t, newRoots.Roots, 1)
require.Equal(t, rootB.ID, newRoots.ActiveRootId)
}
func mustGetRoots(t *testing.T, ch <-chan rootsOrError) *pbconnectca.WatchRootsResponse {
t.Helper()
select {
case rsp := <-ch:
require.NoError(t, rsp.err)
return rsp.rsp
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for WatchRootsResponse")
return nil
}
}
func mustGetError(t *testing.T, ch <-chan rootsOrError) error {
t.Helper()
select {
case rsp := <-ch:
require.Error(t, rsp.err)
return rsp.err
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for WatchRootsResponse")
return nil
}
}
func handleRootsStream(t *testing.T, stream pbconnectca.ConnectCAService_WatchRootsClient) <-chan rootsOrError {
t.Helper()
rspCh := make(chan rootsOrError)
go func() {
for {
rsp, err := stream.Recv()
if errors.Is(err, io.EOF) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return
}
rspCh <- rootsOrError{
rsp: rsp,
err: err,
}
}
}()
return rspCh
}
type rootsOrError struct {
rsp *pbconnectca.WatchRootsResponse
err error
}

View File

@ -0,0 +1,28 @@
package public
import (
"context"
"google.golang.org/grpc/metadata"
)
const metadataKeyToken = "x-consul-token"
// TokenFromContext returns the ACL token in the gRPC metadata attached to the
// given context.
func TokenFromContext(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
toks, ok := md[metadataKeyToken]
if ok && len(toks) > 0 {
return toks[0]
}
return ""
}
// ContextWithToken returns a context with the given ACL token attached.
func ContextWithToken(ctx context.Context, token string) context.Context {
return metadata.AppendToOutgoingContext(ctx, metadataKeyToken, token)
}

View File

@ -37,9 +37,9 @@ func TestStore_IntegrationWithBackend(t *testing.T) {
var maxIndex uint64 = 200 var maxIndex uint64 = 200
count := &counter{latest: 3} count := &counter{latest: 3}
producers := map[string]*eventProducer{ producers := map[string]*eventProducer{
"srv1": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex), state.EventSubjectService{Key: "srv1"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex),
"srv2": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex), state.EventSubjectService{Key: "srv2"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex),
"srv3": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex), state.EventSubjectService{Key: "srv3"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex),
} }
sh := snapshotHandler{producers: producers} sh := snapshotHandler{producers: producers}
@ -88,7 +88,7 @@ func TestStore_IntegrationWithBackend(t *testing.T) {
t.Run(fmt.Sprintf("consumer %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("consumer %d", i), func(t *testing.T) {
require.True(t, len(consumer.states) > 2, "expected more than %d events", len(consumer.states)) require.True(t, len(consumer.states) > 2, "expected more than %d events", len(consumer.states))
expected := producers[consumer.srvName].nodesByIndex expected := producers[state.EventSubjectService{Key: consumer.srvName}.String()].nodesByIndex
for idx, nodes := range consumer.states { for idx, nodes := range consumer.states {
assertDeepEqual(t, idx, expected[idx], nodes) assertDeepEqual(t, idx, expected[idx], nodes)
} }
@ -348,7 +348,7 @@ type snapshotHandler struct {
} }
func (s *snapshotHandler) Snapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { func (s *snapshotHandler) Snapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
producer := s.producers[req.Key] producer := s.producers[req.Subject.String()]
producer.nodesLock.Lock() producer.nodesLock.Lock()
defer producer.nodesLock.Unlock() defer producer.nodesLock.Unlock()

View File

@ -13,10 +13,10 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/grpc/public"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/agent/xds/xdscommon"
@ -189,18 +189,6 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {
return errors.New("not implemented") return errors.New("not implemented")
} }
func tokenFromContext(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
toks, ok := md["x-consul-token"]
if ok && len(toks) > 0 {
return toks[0]
}
return ""
}
// Register the XDS server handlers to the given gRPC server. // Register the XDS server handlers to the given gRPC server.
func (s *Server) Register(srv *grpc.Server) { func (s *Server) Register(srv *grpc.Server) {
envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s)
@ -221,7 +209,7 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
} }
authz, err := s.ResolveToken(tokenFromContext(ctx)) authz, err := s.ResolveToken(public.TokenFromContext(ctx))
if acl.IsErrNotFound(err) { if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) { } else if acl.IsErrPermissionDenied(err) {

View File

@ -0,0 +1,28 @@
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
// source: proto-public/pbconnectca/ca.proto
package pbconnectca
import (
"github.com/golang/protobuf/proto"
)
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *WatchRootsResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *WatchRootsResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *CARoot) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *CARoot) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -0,0 +1,473 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.23.0
// protoc v3.15.8
// source: proto-public/pbconnectca/ca.proto
package pbconnectca
import (
context "context"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type WatchRootsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// active_root_id is the ID of a root in Roots that is the active CA root.
// Other roots are still valid if they're in the Roots list but are in the
// process of being rotated out.
ActiveRootId string `protobuf:"bytes,1,opt,name=active_root_id,json=activeRootId,proto3" json:"active_root_id,omitempty"`
// trust_domain is the identification root for this Consul cluster. All
// certificates signed by the cluster's CA must have their identifying URI
// in this domain.
//
// This does not include the protocol (currently spiffe://) since we may
// implement other protocols in future with equivalent semantics. It should
// be compared against the "authority" section of a URI (i.e. host:port).
TrustDomain string `protobuf:"bytes,2,opt,name=trust_domain,json=trustDomain,proto3" json:"trust_domain,omitempty"`
// roots is a list of root CA certs to trust.
Roots []*CARoot `protobuf:"bytes,3,rep,name=roots,proto3" json:"roots,omitempty"`
}
func (x *WatchRootsResponse) Reset() {
*x = WatchRootsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WatchRootsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WatchRootsResponse) ProtoMessage() {}
func (x *WatchRootsResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WatchRootsResponse.ProtoReflect.Descriptor instead.
func (*WatchRootsResponse) Descriptor() ([]byte, []int) {
return file_proto_public_pbconnectca_ca_proto_rawDescGZIP(), []int{0}
}
func (x *WatchRootsResponse) GetActiveRootId() string {
if x != nil {
return x.ActiveRootId
}
return ""
}
func (x *WatchRootsResponse) GetTrustDomain() string {
if x != nil {
return x.TrustDomain
}
return ""
}
func (x *WatchRootsResponse) GetRoots() []*CARoot {
if x != nil {
return x.Roots
}
return nil
}
type CARoot struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// id is a globally unique ID (UUID) representing this CA root.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// name is a human-friendly name for this CA root. This value is opaque to
// Consul and is not used for anything internally.
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
// serial_number is the x509 serial number of the certificate.
SerialNumber uint64 `protobuf:"varint,3,opt,name=serial_number,json=serialNumber,proto3" json:"serial_number,omitempty"`
// signing_key_id is the connect.HexString encoded id of the public key that
// corresponds to the private key used to sign leaf certificates in the
// local datacenter.
//
// The value comes from x509.Certificate.SubjectKeyId of the local leaf
// signing cert.
//
// See https://www.rfc-editor.org/rfc/rfc3280#section-4.2.1.1 for more detail.
SigningKeyId string `protobuf:"bytes,4,opt,name=signing_key_id,json=signingKeyId,proto3" json:"signing_key_id,omitempty"`
// root_cert is the PEM-encoded public certificate.
RootCert string `protobuf:"bytes,5,opt,name=root_cert,json=rootCert,proto3" json:"root_cert,omitempty"`
// intermediate_certs is a list of PEM-encoded intermediate certs to
// attach to any leaf certs signed by this CA.
IntermediateCerts []string `protobuf:"bytes,6,rep,name=intermediate_certs,json=intermediateCerts,proto3" json:"intermediate_certs,omitempty"`
// active is true if this is the current active CA. This must only
// be true for exactly one CA.
Active bool `protobuf:"varint,7,opt,name=active,proto3" json:"active,omitempty"`
// rotated_out_at is the time at which this CA was removed from the state.
// This will only be set on roots that have been rotated out from being the
// active root.
RotatedOutAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=rotated_out_at,json=rotatedOutAt,proto3" json:"rotated_out_at,omitempty"`
}
func (x *CARoot) Reset() {
*x = CARoot{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CARoot) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CARoot) ProtoMessage() {}
func (x *CARoot) ProtoReflect() protoreflect.Message {
mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CARoot.ProtoReflect.Descriptor instead.
func (*CARoot) Descriptor() ([]byte, []int) {
return file_proto_public_pbconnectca_ca_proto_rawDescGZIP(), []int{1}
}
func (x *CARoot) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *CARoot) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *CARoot) GetSerialNumber() uint64 {
if x != nil {
return x.SerialNumber
}
return 0
}
func (x *CARoot) GetSigningKeyId() string {
if x != nil {
return x.SigningKeyId
}
return ""
}
func (x *CARoot) GetRootCert() string {
if x != nil {
return x.RootCert
}
return ""
}
func (x *CARoot) GetIntermediateCerts() []string {
if x != nil {
return x.IntermediateCerts
}
return nil
}
func (x *CARoot) GetActive() bool {
if x != nil {
return x.Active
}
return false
}
func (x *CARoot) GetRotatedOutAt() *timestamppb.Timestamp {
if x != nil {
return x.RotatedOutAt
}
return nil
}
var File_proto_public_pbconnectca_ca_proto protoreflect.FileDescriptor
var file_proto_public_pbconnectca_ca_proto_rawDesc = []byte{
0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70,
0x62, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2f, 0x63, 0x61, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x12, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x1a, 0x1b,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f,
0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x86, 0x01, 0x0a,
0x12, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x72, 0x6f,
0x6f, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x63, 0x74,
0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x72, 0x75,
0x73, 0x74, 0x5f, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x74, 0x72, 0x75, 0x73, 0x74, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x27, 0x0a, 0x05,
0x72, 0x6f, 0x6f, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6f,
0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2e, 0x43, 0x41, 0x52, 0x6f, 0x6f, 0x74, 0x52, 0x05,
0x72, 0x6f, 0x6f, 0x74, 0x73, 0x22, 0x9d, 0x02, 0x0a, 0x06, 0x43, 0x41, 0x52, 0x6f, 0x6f, 0x74,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x5f, 0x6e,
0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x65, 0x72,
0x69, 0x61, 0x6c, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x69, 0x67,
0x6e, 0x69, 0x6e, 0x67, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0c, 0x73, 0x69, 0x67, 0x6e, 0x69, 0x6e, 0x67, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12,
0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x74, 0x5f, 0x63, 0x65, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x74, 0x43, 0x65, 0x72, 0x74, 0x12, 0x2d, 0x0a, 0x12,
0x69, 0x6e, 0x74, 0x65, 0x72, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x5f, 0x63, 0x65, 0x72,
0x74, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6d,
0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x43, 0x65, 0x72, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61,
0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x61, 0x63, 0x74,
0x69, 0x76, 0x65, 0x12, 0x40, 0x0a, 0x0e, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x6f,
0x75, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x64,
0x4f, 0x75, 0x74, 0x41, 0x74, 0x32, 0x5b, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74,
0x43, 0x41, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x0a, 0x57, 0x61, 0x74,
0x63, 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a,
0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2e, 0x57, 0x61, 0x74, 0x63,
0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x30, 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75,
0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70,
0x62, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_proto_public_pbconnectca_ca_proto_rawDescOnce sync.Once
file_proto_public_pbconnectca_ca_proto_rawDescData = file_proto_public_pbconnectca_ca_proto_rawDesc
)
func file_proto_public_pbconnectca_ca_proto_rawDescGZIP() []byte {
file_proto_public_pbconnectca_ca_proto_rawDescOnce.Do(func() {
file_proto_public_pbconnectca_ca_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_public_pbconnectca_ca_proto_rawDescData)
})
return file_proto_public_pbconnectca_ca_proto_rawDescData
}
var file_proto_public_pbconnectca_ca_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_public_pbconnectca_ca_proto_goTypes = []interface{}{
(*WatchRootsResponse)(nil), // 0: connectca.WatchRootsResponse
(*CARoot)(nil), // 1: connectca.CARoot
(*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp
(*emptypb.Empty)(nil), // 3: google.protobuf.Empty
}
var file_proto_public_pbconnectca_ca_proto_depIdxs = []int32{
1, // 0: connectca.WatchRootsResponse.roots:type_name -> connectca.CARoot
2, // 1: connectca.CARoot.rotated_out_at:type_name -> google.protobuf.Timestamp
3, // 2: connectca.ConnectCAService.WatchRoots:input_type -> google.protobuf.Empty
0, // 3: connectca.ConnectCAService.WatchRoots:output_type -> connectca.WatchRootsResponse
3, // [3:4] is the sub-list for method output_type
2, // [2:3] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_proto_public_pbconnectca_ca_proto_init() }
func file_proto_public_pbconnectca_ca_proto_init() {
if File_proto_public_pbconnectca_ca_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_proto_public_pbconnectca_ca_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WatchRootsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_public_pbconnectca_ca_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CARoot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_public_pbconnectca_ca_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_proto_public_pbconnectca_ca_proto_goTypes,
DependencyIndexes: file_proto_public_pbconnectca_ca_proto_depIdxs,
MessageInfos: file_proto_public_pbconnectca_ca_proto_msgTypes,
}.Build()
File_proto_public_pbconnectca_ca_proto = out.File
file_proto_public_pbconnectca_ca_proto_rawDesc = nil
file_proto_public_pbconnectca_ca_proto_goTypes = nil
file_proto_public_pbconnectca_ca_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// ConnectCAServiceClient is the client API for ConnectCAService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ConnectCAServiceClient interface {
// WatchRoots provides a stream on which you can receive the list of active
// Connect CA roots. Current roots are sent immediately at the start of the
// stream, and new lists will be sent whenever the roots are rotated.
WatchRoots(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (ConnectCAService_WatchRootsClient, error)
}
type connectCAServiceClient struct {
cc grpc.ClientConnInterface
}
func NewConnectCAServiceClient(cc grpc.ClientConnInterface) ConnectCAServiceClient {
return &connectCAServiceClient{cc}
}
func (c *connectCAServiceClient) WatchRoots(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (ConnectCAService_WatchRootsClient, error) {
stream, err := c.cc.NewStream(ctx, &_ConnectCAService_serviceDesc.Streams[0], "/connectca.ConnectCAService/WatchRoots", opts...)
if err != nil {
return nil, err
}
x := &connectCAServiceWatchRootsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type ConnectCAService_WatchRootsClient interface {
Recv() (*WatchRootsResponse, error)
grpc.ClientStream
}
type connectCAServiceWatchRootsClient struct {
grpc.ClientStream
}
func (x *connectCAServiceWatchRootsClient) Recv() (*WatchRootsResponse, error) {
m := new(WatchRootsResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ConnectCAServiceServer is the server API for ConnectCAService service.
type ConnectCAServiceServer interface {
// WatchRoots provides a stream on which you can receive the list of active
// Connect CA roots. Current roots are sent immediately at the start of the
// stream, and new lists will be sent whenever the roots are rotated.
WatchRoots(*emptypb.Empty, ConnectCAService_WatchRootsServer) error
}
// UnimplementedConnectCAServiceServer can be embedded to have forward compatible implementations.
type UnimplementedConnectCAServiceServer struct {
}
func (*UnimplementedConnectCAServiceServer) WatchRoots(*emptypb.Empty, ConnectCAService_WatchRootsServer) error {
return status.Errorf(codes.Unimplemented, "method WatchRoots not implemented")
}
func RegisterConnectCAServiceServer(s *grpc.Server, srv ConnectCAServiceServer) {
s.RegisterService(&_ConnectCAService_serviceDesc, srv)
}
func _ConnectCAService_WatchRoots_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(emptypb.Empty)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ConnectCAServiceServer).WatchRoots(m, &connectCAServiceWatchRootsServer{stream})
}
type ConnectCAService_WatchRootsServer interface {
Send(*WatchRootsResponse) error
grpc.ServerStream
}
type connectCAServiceWatchRootsServer struct {
grpc.ServerStream
}
func (x *connectCAServiceWatchRootsServer) Send(m *WatchRootsResponse) error {
return x.ServerStream.SendMsg(m)
}
var _ConnectCAService_serviceDesc = grpc.ServiceDesc{
ServiceName: "connectca.ConnectCAService",
HandlerType: (*ConnectCAServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "WatchRoots",
Handler: _ConnectCAService_WatchRoots_Handler,
ServerStreams: true,
},
},
Metadata: "proto-public/pbconnectca/ca.proto",
}

View File

@ -0,0 +1,72 @@
syntax = "proto3";
package connectca;
option go_package = "github.com/hashicorp/consul/proto-public/pbconnectca";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
service ConnectCAService {
// WatchRoots provides a stream on which you can receive the list of active
// Connect CA roots. Current roots are sent immediately at the start of the
// stream, and new lists will be sent whenever the roots are rotated.
rpc WatchRoots(google.protobuf.Empty) returns (stream WatchRootsResponse) {};
}
message WatchRootsResponse {
// active_root_id is the ID of a root in Roots that is the active CA root.
// Other roots are still valid if they're in the Roots list but are in the
// process of being rotated out.
string active_root_id = 1;
// trust_domain is the identification root for this Consul cluster. All
// certificates signed by the cluster's CA must have their identifying URI
// in this domain.
//
// This does not include the protocol (currently spiffe://) since we may
// implement other protocols in future with equivalent semantics. It should
// be compared against the "authority" section of a URI (i.e. host:port).
string trust_domain = 2;
// roots is a list of root CA certs to trust.
repeated CARoot roots = 3;
}
message CARoot {
// id is a globally unique ID (UUID) representing this CA root.
string id = 1;
// name is a human-friendly name for this CA root. This value is opaque to
// Consul and is not used for anything internally.
string name = 2;
// serial_number is the x509 serial number of the certificate.
uint64 serial_number = 3;
// signing_key_id is the connect.HexString encoded id of the public key that
// corresponds to the private key used to sign leaf certificates in the
// local datacenter.
//
// The value comes from x509.Certificate.SubjectKeyId of the local leaf
// signing cert.
//
// See https://www.rfc-editor.org/rfc/rfc3280#section-4.2.1.1 for more detail.
string signing_key_id = 4;
// root_cert is the PEM-encoded public certificate.
string root_cert = 5;
// intermediate_certs is a list of PEM-encoded intermediate certs to
// attach to any leaf certs signed by this CA.
repeated string intermediate_certs = 6;
// active is true if this is the current active CA. This must only
// be true for exactly one CA.
bool active = 7;
// rotated_out_at is the time at which this CA was removed from the state.
// This will only be set on roots that have been rotated out from being the
// active root.
google.protobuf.Timestamp rotated_out_at = 8;
}