add ip rate limiter controller OSS parts (#16790)

This commit is contained in:
Dhia Ayachi 2023-03-27 17:00:25 -04:00 committed by GitHub
parent 045f39ddd9
commit 6da620159e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 109 additions and 60 deletions

View File

@ -290,97 +290,80 @@ func (c *FSM) registerStreamSnapshotHandlers() {
err := c.deps.Publisher.RegisterHandler(state.EventTopicServiceHealth, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceHealthSnapshot(req, buf)
}, false)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceHealthConnect, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceHealthSnapshot(req, buf)
}, false)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicCARoots, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().CARootsSnapshot(req, buf)
}, false)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicMeshConfig, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().MeshConfigSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceResolver, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceResolverSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicIngressGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().IngressGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceIntentions, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceIntentionsSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceList, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceListSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceDefaults, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ServiceDefaultsSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().APIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().InlineCertificateSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().HTTPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().TCPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().BoundAPIGatewaySnapshot(req, buf)
}, true)
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicIPRateLimit, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().IPRateLimiterSnapshot(req, buf)
}, true)
panicIfErr(err)
}
func panicIfErr(err error) {
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

View File

@ -410,6 +410,10 @@ type Server struct {
// embedded struct to hold all the enterprise specific data
EnterpriseServer
operatorServer *operator.Server
// routineManager is responsible for managing longer running go routines
// run by the Server
routineManager *routine.Manager
}
type connHandler interface {
@ -480,6 +484,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
fsm: fsm.NewFromDeps(fsmDeps),
publisher: flat.EventPublisher,
incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
}
incomingRPCLimiter.Register(s)
@ -778,6 +783,11 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
// Now we are setup, configure the HCP manager
go s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
err = s.runEnterpriseRateLimiterConfigEntryController()
if err != nil {
return nil, err
}
return s, nil
}

View File

@ -19,6 +19,11 @@ import (
"github.com/hashicorp/consul/lib"
)
// runEnterpriseRateLimiterConfigEntryController start the rate limiter config controller
func (s *Server) runEnterpriseRateLimiterConfigEntryController() error {
return nil
}
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
func (s *Server) enterpriseValidateJoinWAN() error {

View File

@ -22,6 +22,7 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.HTTPRoute: EventTopicHTTPRoute,
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
structs.RateLimitIPConfig: EventTopicIPRateLimit,
}
// EventSubjectConfigEntry is a stream.Subject used to route and receive events
@ -152,6 +153,12 @@ func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.
return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf)
}
// IPRateLimiterSnapshot is a stream.SnapshotFunc that returns a snapshot of
// "control-plane-request-limit" config entries.
func (s *Store) IPRateLimiterSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.RateLimitIPConfig, req, buf)
}
func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64

View File

@ -191,6 +191,7 @@ var (
EventTopicHTTPRoute = pbsubscribe.Topic_HTTPRoute
EventTopicInlineCertificate = pbsubscribe.Topic_InlineCertificate
EventTopicBoundAPIGateway = pbsubscribe.Topic_BoundAPIGateway
EventTopicIPRateLimit = pbsubscribe.Topic_IPRateLimit
)
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {

View File

@ -453,6 +453,22 @@ var baseCases = map[string]testCase{
{Name: "kind", Value: "tcp-route"},
},
},
"consul.usage.test.consul.state.config_entries;datacenter=dc1;kind=control-plane-request-limit": {
Name: "consul.usage.test.consul.state.config_entries",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
{Name: "kind", Value: "control-plane-request-limit"},
},
},
"consul.usage.test.state.config_entries;datacenter=dc1;kind=control-plane-request-limit": {
Name: "consul.usage.test.state.config_entries",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
{Name: "kind", Value: "control-plane-request-limit"},
},
},
},
getMembersFunc: func() []serf.Member { return []serf.Member{} },
},
@ -896,6 +912,22 @@ var baseCases = map[string]testCase{
{Name: "kind", Value: "tcp-route"},
},
},
"consul.usage.test.consul.state.config_entries;datacenter=dc1;kind=control-plane-request-limit": {
Name: "consul.usage.test.consul.state.config_entries",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
{Name: "kind", Value: "control-plane-request-limit"},
},
},
"consul.usage.test.state.config_entries;datacenter=dc1;kind=control-plane-request-limit": {
Name: "consul.usage.test.state.config_entries",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
{Name: "kind", Value: "control-plane-request-limit"},
},
},
},
},
}

View File

@ -72,6 +72,8 @@ func newConfigEntryRequest(req *structs.ConfigEntryQuery, deps ServerDataSourceD
topic = pbsubscribe.Topic_InlineCertificate
case structs.BoundAPIGateway:
topic = pbsubscribe.Topic_BoundAPIGateway
case structs.RateLimitIPConfig:
topic = pbsubscribe.Topic_IPRateLimit
default:
return nil, fmt.Errorf("cannot map config entry kind: %s to a topic", req.Kind)
}

View File

@ -68,6 +68,7 @@ var AllConfigEntryKinds = []string{
HTTPRoute,
TCPRoute,
InlineCertificate,
RateLimitIPConfig,
}
// ConfigEntry is the interface for centralized configuration stored in Raft.

View File

@ -69,6 +69,8 @@ const (
Topic_InlineCertificate Topic = 12
// BoundAPIGateway topic contains events for changes to bound-api-gateways.
Topic_BoundAPIGateway Topic = 13
// IPRateLimit topic contains events for changes to control-plane-request-limit
Topic_IPRateLimit Topic = 14
)
// Enum value maps for Topic.
@ -88,6 +90,7 @@ var (
11: "HTTPRoute",
12: "InlineCertificate",
13: "BoundAPIGateway",
14: "IPRateLimit",
}
Topic_value = map[string]int32{
"Unknown": 0,
@ -104,6 +107,7 @@ var (
"HTTPRoute": 11,
"InlineCertificate": 12,
"BoundAPIGateway": 13,
"IPRateLimit": 14,
}
)
@ -985,7 +989,7 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45,
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0x90, 0x02, 0x0a, 0x05, 0x54, 0x6f,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xa1, 0x02, 0x0a, 0x05, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00,
0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65,
@ -1002,26 +1006,27 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x0a, 0x12, 0x0d, 0x0a, 0x09, 0x48, 0x54, 0x54, 0x50, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x10, 0x0b,
0x12, 0x15, 0x0a, 0x11, 0x49, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66,
0x69, 0x63, 0x61, 0x74, 0x65, 0x10, 0x0c, 0x12, 0x13, 0x0a, 0x0f, 0x42, 0x6f, 0x75, 0x6e, 0x64,
0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x10, 0x0d, 0x2a, 0x29, 0x0a, 0x09,
0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x67,
0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, 0x67,
0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, 0x65,
0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69,
0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12,
0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x08,
0xe2, 0x86, 0x04, 0x04, 0x08, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x9a, 0x01, 0x0a, 0x0d, 0x63,
0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x35,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69,
0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5c,
0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x10, 0x0d, 0x12, 0x0f, 0x0a, 0x0b,
0x49, 0x50, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x10, 0x0e, 0x2a, 0x29, 0x0a,
0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65,
0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65,
0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74,
0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22,
0x08, 0xe2, 0x86, 0x04, 0x04, 0x08, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x9a, 0x01, 0x0a, 0x0d,
0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a,
0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68,
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63,
0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -94,6 +94,9 @@ enum Topic {
// BoundAPIGateway topic contains events for changes to bound-api-gateways.
BoundAPIGateway = 13;
// IPRateLimit topic contains events for changes to control-plane-request-limit
IPRateLimit = 14;
}
message NamedSubject {