sync changes to oss files made in enterprise (#10670)

This commit is contained in:
R.B. Boyer 2021-07-22 13:58:08 -05:00 committed by GitHub
parent 62ac98b564
commit 254557a1f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 585 additions and 83 deletions

View File

@ -367,6 +367,10 @@ func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}
func (f fakeGRPCConnPool) ClientConnLeader() (*grpc.ClientConn, error) {
return nil, nil
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -9,12 +9,8 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
@ -24,6 +20,11 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
func testClientConfig(t *testing.T) (string, *Config) {
@ -169,6 +170,10 @@ func TestClient_LANReap(t *testing.T) {
}
func TestClient_JoinLAN_Invalid(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -497,7 +502,9 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger)
require.NoError(t, err, "failed to create tls configuration")
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), nil)
builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: c.NodeName})
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), builder)
resolver.Register(builder)
connPool := &pool.ConnPool{
Server: false,
@ -515,6 +522,8 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
Tokens: new(token.Store),
Router: r,
ConnPool: connPool,
GRPCConnPool: grpc.NewClientConnPool(builder, grpc.TLSWrapper(tls.OutgoingRPCWrapper()), tls.UseTLS),
LeaderForwarder: builder,
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
}
}

View File

@ -333,6 +333,10 @@ func getCAProviderWithLock(s *Server) (ca.Provider, *structs.CARoot) {
}
func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca.SkipIfVaultNotPresent(t)
// no parallel execution because we change globals

View File

@ -17,9 +17,15 @@ type Deps struct {
Router *router.Router
ConnPool *pool.ConnPool
GRPCConnPool GRPCClientConner
LeaderForwarder LeaderForwarder
EnterpriseDeps
}
type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
ClientConnLeader() (*grpc.ClientConn, error)
}
type LeaderForwarder interface {
UpdateLeaderAddr(leaderAddr string)
}

View File

@ -261,6 +261,10 @@ type Server struct {
// Used to do leader forwarding and provide fast lookup by server id and address
serverLookup *ServerLookup
// grpcLeaderForwarder is notified on leader change in order to keep the grpc
// resolver up to date.
grpcLeaderForwarder LeaderForwarder
// floodLock controls access to floodCh.
floodLock sync.RWMutex
floodCh []chan struct{}
@ -583,6 +587,8 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder
go s.trackLeaderChanges()
// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
@ -627,15 +633,15 @@ func newFSMFromConfig(logger hclog.Logger, gc *state.TombstoneGC, config *Config
}
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
if !config.RPCConfig.EnableStreaming {
return agentgrpc.NoOpHandler{Logger: deps.Logger}
register := func(srv *grpc.Server) {
if config.RPCConfig.EnableStreaming {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
deps.Logger.Named("grpc-api.subscription")))
}
s.registerEnterpriseGRPCServices(deps, srv)
}
register := func(srv *grpc.Server) {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
deps.Logger.Named("grpc-api.subscription")))
}
return agentgrpc.NewHandler(config.RPCAddr, register)
}
@ -1450,6 +1456,33 @@ func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
}
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the grpc resolver up to date for leader forwarding.
func (s *Server) trackLeaderChanges() {
obsCh := make(chan raft.Observation, 16)
observer := raft.NewObserver(obsCh, false, func(o *raft.Observation) bool {
_, ok := o.Data.(raft.LeaderObservation)
return ok
})
s.raft.RegisterObserver(observer)
for {
select {
case obs := <-obsCh:
leaderObs, ok := obs.Data.(raft.LeaderObservation)
if !ok {
s.logger.Debug("got unknown observation type from raft", "type", reflect.TypeOf(obs.Data))
continue
}
s.grpcLeaderForwarder.UpdateLeaderAddr(string(leaderObs.Leader))
case <-s.shutdownCh:
s.raft.DeregisterObserver(observer)
return
}
}
}
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.

View File

@ -4,9 +4,12 @@ package consul
import (
"github.com/hashicorp/serf/serf"
"google.golang.org/grpc"
)
func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error {
// nothing to do for oss
return nil
}
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}

View File

@ -1826,6 +1826,12 @@ func (q NodeServiceQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q NodeServiceQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {

View File

@ -540,3 +540,9 @@ type NodeCheckQuery struct {
func (q NodeCheckQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q NodeCheckQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}

View File

@ -21,6 +21,12 @@ func (q Query) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q Query) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// indexFromQuery builds an index key where Query.Value is lowercase, and is
// a required value.
func indexFromQuery(arg interface{}) ([]byte, error) {

View File

@ -107,10 +107,12 @@ func (tc indexerTestCase) run(t *testing.T, indexer memdb.Indexer) {
}
if i, ok := indexer.(memdb.MultiIndexer); ok {
valid, actual, err := i.FromObject(tc.writeMulti.source)
require.NoError(t, err)
require.True(t, valid)
require.Equal(t, tc.writeMulti.expected, actual)
t.Run("writeIndexMulti", func(t *testing.T) {
valid, actual, err := i.FromObject(tc.writeMulti.source)
require.NoError(t, err)
require.True(t, valid)
require.Equal(t, tc.writeMulti.expected, actual)
})
}
for _, extra := range tc.extra {

View File

@ -31,6 +31,7 @@ var _ subscribe.Backend = (*subscribeBackend)(nil)
// or if it matches the Datacenter in config.
//
// TODO: extract this so that it can be used with other grpc services.
// TODO: rename to ForwardToDC
func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) {
if dc == "" || dc == s.srv.config.Datacenter {
return false, nil

View File

@ -50,15 +50,25 @@ func NewClientConnPool(servers ServerLocator, tls TLSWrapper, useTLSForDC func(d
// existing connections in the pool, a new one will be created, stored in the pool,
// then returned.
func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) {
return c.dial(datacenter, "server")
}
// TODO: godoc
func (c *ClientConnPool) ClientConnLeader() (*grpc.ClientConn, error) {
return c.dial("local", "leader")
}
func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.ClientConn, error) {
c.connsLock.Lock()
defer c.connsLock.Unlock()
if conn, ok := c.conns[datacenter]; ok {
target := fmt.Sprintf("consul://%s/%s.%s", c.servers.Authority(), serverType, datacenter)
if conn, ok := c.conns[target]; ok {
return conn, nil
}
conn, err := grpc.Dial(
fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter),
target,
// use WithInsecure mode here because we handle the TLS wrapping in the
// custom dialer based on logic around whether the server has TLS enabled.
grpc.WithInsecure(),
@ -86,7 +96,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error)
return nil, err
}
c.conns[datacenter] = conn
c.conns[target] = conn
return conn, nil
}

View File

@ -113,6 +113,44 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
require.NotEqual(t, resp.ServerName, first.ServerName)
}
func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
count := 3
conf := newConfig(t)
res := resolver.NewServerResolverBuilder(conf)
registerWithGRPC(t, res)
pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
}
// Set the leader address to the first server.
res.UpdateLeaderAddr(servers[0].addr.String())
conn, err := pool.ClientConnLeader()
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, first.ServerName, servers[0].name)
// Update the leader address and make another request.
res.UpdateLeaderAddr(servers[1].addr.String())
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, resp.ServerName, servers[1].name)
}
func newConfig(t *testing.T) resolver.Config {
n := t.Name()
s := strings.Replace(n, "/", "", -1)

View File

@ -16,13 +16,15 @@ import (
// ServerResolvers updated when changes occur.
type ServerResolverBuilder struct {
cfg Config
// leaderResolver is used to track the address of the leader in the local DC.
leaderResolver leaderResolver
// servers is an index of Servers by Server.ID. The map contains server IDs
// for all datacenters.
servers map[string]*metadata.Server
// resolvers is an index of connections to the serverResolver which manages
// addresses of servers for that connection.
resolvers map[resolver.ClientConn]*serverResolver
// lock for servers and resolvers.
// lock for all stateful fields (excludes config which is immutable).
lock sync.RWMutex
}
@ -89,9 +91,23 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client
if resolver, ok := s.resolvers[cc]; ok {
return resolver, nil
}
if cc == s.leaderResolver.clientConn {
return s.leaderResolver, nil
}
serverType, datacenter, err := parseEndpoint(target.Endpoint)
if err != nil {
return nil, err
}
if serverType == "leader" {
// TODO: is this safe? can we ever have multiple CC for the leader? Seems
// like we can only have one given the caching in ClientConnPool.Dial
s.leaderResolver.clientConn = cc
s.leaderResolver.updateClientConn()
return s.leaderResolver, nil
}
// Make a new resolver for the dc and add it to the list of active ones.
datacenter := strings.TrimPrefix(target.Endpoint, "server.")
resolver := &serverResolver{
datacenter: datacenter,
clientConn: cc,
@ -107,6 +123,16 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client
return resolver, nil
}
// parseEndpoint parses a string, expecting a format of "serverType.datacenter"
func parseEndpoint(target string) (string, string, error) {
parts := strings.SplitN(target, ".", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("unexpected endpoint address: %v", target)
}
return parts[0], parts[1], nil
}
func (s *ServerResolverBuilder) Authority() string {
return s.cfg.Authority
}
@ -168,6 +194,15 @@ func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
return addrs
}
// UpdateLeaderAddr updates the leader address in the local DC's resolver.
func (s *ServerResolverBuilder) UpdateLeaderAddr(leaderAddr string) {
s.lock.Lock()
defer s.lock.Unlock()
s.leaderResolver.addr = leaderAddr
s.leaderResolver.updateClientConn()
}
// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date
// on the list of server addresses to use.
type serverResolver struct {
@ -224,4 +259,29 @@ func (r *serverResolver) Close() {
}
// ResolveNow is not used
func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {}
func (*serverResolver) ResolveNow(resolver.ResolveNowOption) {}
type leaderResolver struct {
addr string
clientConn resolver.ClientConn
}
func (l leaderResolver) ResolveNow(resolver.ResolveNowOption) {}
func (l leaderResolver) Close() {}
func (l leaderResolver) updateClientConn() {
if l.addr == "" || l.clientConn == nil {
return
}
addrs := []resolver.Address{
{
Addr: l.addr,
Type: resolver.Backend,
ServerName: "leader",
},
}
l.clientConn.UpdateState(resolver.State{Addresses: addrs})
}
var _ resolver.Resolver = (*leaderResolver)(nil)

View File

@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbcommon"
)
var HTTPSummaries = []prometheus.SummaryDefinition{
@ -901,6 +902,14 @@ func (s *HTTPHandlers) parseConsistency(resp http.ResponseWriter, req *http.Requ
return false
}
// parseConsistencyReadRequest is used to parse the ?consistent query param.
func parseConsistencyReadRequest(resp http.ResponseWriter, req *http.Request, b *pbcommon.ReadRequest) {
query := req.URL.Query()
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
}
}
// parseDC is used to parse the ?dc query param
func (s *HTTPHandlers) parseDC(req *http.Request, dc *string) {
if other := req.URL.Query().Get("dc"); other != "" {

View File

@ -277,16 +277,16 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
type testBackend struct {
store *state.Store
authorizer func(token string) acl.Authorizer
authorizer func(token string, entMeta *structs.EnterpriseMeta) acl.Authorizer
forwardConn *gogrpc.ClientConn
}
func (b testBackend) ResolveTokenAndDefaultMeta(
token string,
_ *structs.EnterpriseMeta,
entMeta *structs.EnterpriseMeta,
_ *acl.AuthorizerContext,
) (acl.Authorizer, error) {
return b.authorizer(token), nil
return b.authorizer(token, entMeta), nil
}
func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) {
@ -306,7 +306,7 @@ func newTestBackend() (*testBackend, error) {
return nil, err
}
store := state.NewStateStoreWithEventPublisher(gc)
allowAll := func(_ string) acl.Authorizer {
allowAll := func(string, *structs.EnterpriseMeta) acl.Authorizer {
return acl.AllowAll()
}
return &testBackend{store: store, authorizer: allowAll}, nil
@ -612,7 +612,7 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package?
backend.authorizer = func(tok string) acl.Authorizer {
backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer {
if tok == token {
return authorizer
}
@ -811,7 +811,7 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package?
backend.authorizer = func(tok string) acl.Authorizer {
backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer {
if tok == token {
return authorizer
}

View File

@ -106,6 +106,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
builder := resolver.NewServerResolverBuilder(resolver.Config{})
resolver.Register(builder)
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS)
d.LeaderForwarder = builder
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)

View File

@ -1,3 +1,5 @@
// +build !consulent
package structs
import (

View File

@ -76,6 +76,10 @@ const (
// HTTPNamespaceEnvVar defines an environment variable name which sets
// the HTTP Namespace to be used by default. This can still be overridden.
HTTPNamespaceEnvName = "CONSUL_NAMESPACE"
// HTTPPartitionEnvName defines an environment variable name which sets
// the HTTP Partition to be used by default. This can still be overridden.
HTTPPartitionEnvName = "CONSUL_PARTITION"
)
// QueryOptions are used to parameterize a query
@ -84,6 +88,10 @@ type QueryOptions struct {
// Note: Namespaces are available only in Consul Enterprise
Namespace string
// Partition overrides the `default` partition
// Note: Partitions are available only in Consul Enterprise
Partition string
// Providing a datacenter overwrites the DC provided
// by the Config
Datacenter string
@ -191,6 +199,10 @@ type WriteOptions struct {
// Note: Namespaces are available only in Consul Enterprise
Namespace string
// Partition overrides the `default` partition
// Note: Partitions are available only in Consul Enterprise
Partition string
// Providing a datacenter overwrites the DC provided
// by the Config
Datacenter string
@ -314,6 +326,10 @@ type Config struct {
// when no other Namespace is present in the QueryOptions
Namespace string
// Partition is the name of the partition to send along for the request
// when no other Partition is present in the QueryOptions
Partition string
TLSConfig TLSConfig
}
@ -466,6 +482,10 @@ func defaultConfig(logger hclog.Logger, transportFn func() *http.Transport) *Con
config.Namespace = v
}
if v := os.Getenv(HTTPPartitionEnvName); v != "" {
config.Partition = v
}
return config
}
@ -732,6 +752,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Namespace != "" {
r.params.Set("ns", q.Namespace)
}
if q.Partition != "" {
r.params.Set("partition", q.Partition)
}
if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter)
}
@ -834,6 +857,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
if q.Namespace != "" {
r.params.Set("ns", q.Namespace)
}
if q.Partition != "" {
r.params.Set("partition", q.Partition)
}
if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter)
}
@ -908,6 +934,9 @@ func (c *Client) newRequest(method, path string) *request {
if c.config.Namespace != "" {
r.params.Set("ns", c.config.Namespace)
}
if c.config.Partition != "" {
r.params.Set("partition", c.config.Partition)
}
if c.config.WaitTime != 0 {
r.params.Set("wait", durToMsec(r.config.WaitTime))
}

View File

@ -738,6 +738,7 @@ func TestAPI_SetQueryOptions(t *testing.T) {
r := c.newRequest("GET", "/v1/kv/foo")
q := &QueryOptions{
Namespace: "operator",
Partition: "asdf",
Datacenter: "foo",
AllowStale: true,
RequireConsistent: true,
@ -752,6 +753,9 @@ func TestAPI_SetQueryOptions(t *testing.T) {
if r.params.Get("ns") != "operator" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("partition") != "asdf" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params)
}
@ -799,6 +803,7 @@ func TestAPI_SetWriteOptions(t *testing.T) {
r := c.newRequest("GET", "/v1/kv/foo")
q := &WriteOptions{
Namespace: "operator",
Partition: "asdf",
Datacenter: "foo",
Token: "23456",
}
@ -806,6 +811,9 @@ func TestAPI_SetWriteOptions(t *testing.T) {
if r.params.Get("ns") != "operator" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("partition") != "asdf" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params)
}

View File

@ -49,6 +49,7 @@ const (
Session string = "session"
Sentinel string = "sentinel"
Snapshot string = "snapshot"
Partition string = "partition"
TerminatingGateway string = "terminating_gateway"
TLSUtil string = "tlsutil"
Transaction string = "txn"

View File

@ -74,6 +74,14 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
q.StaleIfError = staleIfError
}
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
o := structs.QueryOptions{
MaxQueryTime: q.MaxQueryTime,
MinQueryIndex: q.MinQueryIndex,
}
return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime)
}
// SetFilter is needed to implement the structs.QueryOptionsCompat interface
func (q *QueryOptions) SetFilter(filter string) {
q.Filter = filter
@ -121,6 +129,10 @@ func (w WriteRequest) AllowStaleRead() bool {
return false
}
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}
func (td TargetDatacenter) RequestDatacenter() string {
return td.Datacenter
}

View File

@ -37,6 +37,16 @@ func (msg *WriteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ReadRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ReadRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *QueryOptions) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)

View File

@ -152,6 +152,67 @@ func (m *WriteRequest) GetToken() string {
return ""
}
// ReadRequest is a type that may be embedded into any requests for read
// operations.
// It is a replacement for QueryOptions now that we no longer need any of those
// fields because we are moving away from using blocking queries.
// It is also similar to WriteRequest. It is a separate type so that in the
// future we can introduce fields that may only be relevant for reads.
type ReadRequest struct {
// Token is the ACL token ID. If not provided, the 'anonymous'
// token is assumed for backwards compatibility.
Token string `protobuf:"bytes,1,opt,name=Token,proto3" json:"Token,omitempty"`
// RequireConsistent indicates that the request must be sent to the leader.
RequireConsistent bool `protobuf:"varint,2,opt,name=RequireConsistent,proto3" json:"RequireConsistent,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{3}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReadRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadRequest.Merge(m, src)
}
func (m *ReadRequest) XXX_Size() int {
return m.Size()
}
func (m *ReadRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ReadRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ReadRequest proto.InternalMessageInfo
func (m *ReadRequest) GetToken() string {
if m != nil {
return m.Token
}
return ""
}
func (m *ReadRequest) GetRequireConsistent() bool {
if m != nil {
return m.RequireConsistent
}
return false
}
// QueryOptions is used to specify various flags for read queries
type QueryOptions struct {
// Token is the ACL token ID. If not provided, the 'anonymous'
@ -209,7 +270,7 @@ func (m *QueryOptions) Reset() { *m = QueryOptions{} }
func (m *QueryOptions) String() string { return proto.CompactTextString(m) }
func (*QueryOptions) ProtoMessage() {}
func (*QueryOptions) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{3}
return fileDescriptor_a6f5ac44994d718c, []int{4}
}
func (m *QueryOptions) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -336,7 +397,7 @@ func (m *QueryMeta) Reset() { *m = QueryMeta{} }
func (m *QueryMeta) String() string { return proto.CompactTextString(m) }
func (*QueryMeta) ProtoMessage() {}
func (*QueryMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{4}
return fileDescriptor_a6f5ac44994d718c, []int{5}
}
func (m *QueryMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -398,15 +459,15 @@ func (m *QueryMeta) GetConsistencyLevel() string {
type EnterpriseMeta struct {
// Namespace in which the entity exists.
Namespace string `protobuf:"bytes,1,opt,name=Namespace,proto3" json:"Namespace,omitempty"`
// Tenant in which the entity exists.
Tenant string `protobuf:"bytes,2,opt,name=Tenant,proto3" json:"Tenant,omitempty"`
// Partition in which the entity exists.
Partition string `protobuf:"bytes,2,opt,name=Partition,proto3" json:"Partition,omitempty"`
}
func (m *EnterpriseMeta) Reset() { *m = EnterpriseMeta{} }
func (m *EnterpriseMeta) String() string { return proto.CompactTextString(m) }
func (*EnterpriseMeta) ProtoMessage() {}
func (*EnterpriseMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{5}
return fileDescriptor_a6f5ac44994d718c, []int{6}
}
func (m *EnterpriseMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -439,6 +500,7 @@ func init() {
proto.RegisterType((*RaftIndex)(nil), "common.RaftIndex")
proto.RegisterType((*TargetDatacenter)(nil), "common.TargetDatacenter")
proto.RegisterType((*WriteRequest)(nil), "common.WriteRequest")
proto.RegisterType((*ReadRequest)(nil), "common.ReadRequest")
proto.RegisterType((*QueryOptions)(nil), "common.QueryOptions")
proto.RegisterType((*QueryMeta)(nil), "common.QueryMeta")
proto.RegisterType((*EnterpriseMeta)(nil), "common.EnterpriseMeta")
@ -447,45 +509,46 @@ func init() {
func init() { proto.RegisterFile("proto/pbcommon/common.proto", fileDescriptor_a6f5ac44994d718c) }
var fileDescriptor_a6f5ac44994d718c = []byte{
// 606 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x4c,
0x14, 0x8d, 0xbf, 0x2f, 0x0d, 0xf1, 0x4d, 0x5b, 0x95, 0x51, 0x85, 0x4c, 0x41, 0x4e, 0x14, 0x21,
0x54, 0x55, 0x10, 0x4b, 0x65, 0x57, 0x56, 0x6d, 0xda, 0xa2, 0x8a, 0x98, 0x0a, 0x13, 0x84, 0xc4,
0x6e, 0xe2, 0xdc, 0x38, 0x23, 0xec, 0x19, 0x33, 0x1e, 0xb7, 0xe9, 0x1b, 0xb0, 0x64, 0x59, 0xb1,
0xe2, 0x41, 0x78, 0x80, 0x2e, 0xbb, 0x64, 0x55, 0xa0, 0x79, 0x03, 0x9e, 0x00, 0x79, 0xec, 0xb6,
0x2e, 0xed, 0x22, 0xac, 0x92, 0x73, 0x7c, 0xce, 0xcc, 0xfd, 0x39, 0x36, 0x3c, 0x88, 0xa5, 0x50,
0xc2, 0x89, 0x07, 0xbe, 0x88, 0x22, 0xc1, 0x9d, 0xfc, 0xa7, 0xa3, 0x59, 0x52, 0xcb, 0xd1, 0x8a,
0x1d, 0x08, 0x11, 0x84, 0xe8, 0x68, 0x76, 0x90, 0x8e, 0x9c, 0x61, 0x2a, 0xa9, 0x62, 0x17, 0xba,
0x95, 0xe5, 0x40, 0x04, 0x22, 0x3f, 0x28, 0xfb, 0x97, 0xb3, 0xed, 0x08, 0x4c, 0x8f, 0x8e, 0xd4,
0x1e, 0x1f, 0xe2, 0x84, 0x38, 0xd0, 0xe8, 0x4a, 0xa4, 0x0a, 0x35, 0xb4, 0x8c, 0x96, 0xb1, 0x5a,
0xdd, 0x5a, 0xf8, 0x7d, 0xd6, 0x34, 0x07, 0x38, 0x89, 0xe5, 0x46, 0xfb, 0x69, 0xdb, 0x2b, 0x2b,
0x32, 0x83, 0x2b, 0x86, 0x6c, 0x74, 0x94, 0x1b, 0xfe, 0xbb, 0xd5, 0x50, 0x52, 0xb4, 0xd7, 0x61,
0xa9, 0x4f, 0x65, 0x80, 0x6a, 0x9b, 0x2a, 0xea, 0x23, 0x57, 0x28, 0x89, 0x0d, 0x70, 0x85, 0xf4,
0xa5, 0xa6, 0x57, 0x62, 0xda, 0x6b, 0x30, 0xff, 0x4e, 0x32, 0x85, 0x1e, 0x7e, 0x4c, 0x31, 0x51,
0x64, 0x19, 0xe6, 0xfa, 0xe2, 0x03, 0xf2, 0x42, 0x9a, 0x83, 0x8d, 0xea, 0xa7, 0xaf, 0x4d, 0xa3,
0xfd, 0xa5, 0x0a, 0xf3, 0xaf, 0x53, 0x94, 0x47, 0xfb, 0x71, 0xd6, 0x7a, 0x72, 0xbb, 0x98, 0x3c,
0x82, 0x05, 0x97, 0x71, 0x2d, 0x2c, 0x55, 0xee, 0x5d, 0x27, 0xc9, 0x0b, 0x98, 0x77, 0xe9, 0x44,
0x13, 0x7d, 0x16, 0xa1, 0xf5, 0x7f, 0xcb, 0x58, 0x6d, 0xac, 0xdf, 0xef, 0xe4, 0x83, 0xee, 0x5c,
0x0c, 0xba, 0xb3, 0x5d, 0x0c, 0x7a, 0xab, 0x7e, 0x72, 0xd6, 0xac, 0x1c, 0xff, 0x68, 0x1a, 0xde,
0x35, 0x63, 0xd6, 0xe1, 0x66, 0x18, 0x8a, 0xc3, 0x37, 0x8a, 0x86, 0x68, 0x55, 0x5b, 0xc6, 0x6a,
0xdd, 0x2b, 0x31, 0xe4, 0x09, 0xdc, 0xcd, 0x9a, 0x63, 0x12, 0xbb, 0x82, 0x27, 0x2c, 0x51, 0xc8,
0x95, 0x35, 0xa7, 0x65, 0x37, 0x1f, 0x90, 0x15, 0xa8, 0xbf, 0x4d, 0xb0, 0x4b, 0xfd, 0x31, 0x5a,
0x35, 0x2d, 0xba, 0xc4, 0x64, 0x1f, 0x96, 0x5c, 0x3a, 0xd1, 0xa7, 0x5e, 0x54, 0x65, 0xdd, 0x99,
0xbd, 0xec, 0x1b, 0x66, 0xf2, 0x1c, 0x6a, 0x2e, 0x9d, 0x6c, 0x06, 0x68, 0xd5, 0x67, 0x3f, 0xa6,
0xb0, 0x90, 0xc7, 0xb0, 0xe8, 0xa6, 0x89, 0xf2, 0xf0, 0x80, 0x86, 0x6c, 0x48, 0x15, 0x5a, 0xa6,
0xae, 0xf7, 0x2f, 0x36, 0x1b, 0xb4, 0xbe, 0x75, 0x6f, 0xb4, 0x23, 0xa5, 0x90, 0x16, 0xfc, 0xc3,
0xa0, 0xcb, 0x46, 0x72, 0x0f, 0x6a, 0xbb, 0x2c, 0xcc, 0x62, 0xd4, 0xd0, 0xeb, 0x2e, 0x50, 0x11,
0x8e, 0x6f, 0x06, 0x98, 0x7a, 0x29, 0x2e, 0x2a, 0x9a, 0x25, 0xa3, 0x14, 0x73, 0x2f, 0x07, 0x64,
0x07, 0x1a, 0x3d, 0x9a, 0xa8, 0xae, 0xe0, 0x8a, 0xfa, 0x4a, 0xe7, 0x62, 0xc6, 0x4a, 0xca, 0x3e,
0xd2, 0x82, 0xc6, 0x4b, 0x2e, 0x0e, 0x79, 0x0f, 0xe9, 0x10, 0xa5, 0x4e, 0x4e, 0xdd, 0x2b, 0x53,
0x64, 0x0d, 0x96, 0x2e, 0x77, 0xea, 0x1f, 0xf5, 0xf0, 0x00, 0x43, 0x9d, 0x0c, 0xd3, 0xbb, 0xc1,
0x17, 0xe5, 0xef, 0xc2, 0xe2, 0x4e, 0xf6, 0x42, 0xc4, 0x92, 0x25, 0xa8, 0x5b, 0x78, 0x08, 0xe6,
0x2b, 0x1a, 0x61, 0x12, 0x53, 0x1f, 0x8b, 0x80, 0x5f, 0x11, 0xd9, 0x30, 0xfa, 0xc8, 0x29, 0xcf,
0xbb, 0x30, 0xbd, 0x02, 0x6d, 0xf5, 0x4e, 0x7e, 0xd9, 0x95, 0x93, 0x73, 0xdb, 0x38, 0x3d, 0xb7,
0x8d, 0x9f, 0xe7, 0xb6, 0xf1, 0x79, 0x6a, 0x57, 0x8e, 0xa7, 0x76, 0xe5, 0x74, 0x6a, 0x57, 0xbe,
0x4f, 0xed, 0xca, 0xfb, 0xb5, 0x80, 0xa9, 0x71, 0x3a, 0xe8, 0xf8, 0x22, 0x72, 0xc6, 0x34, 0x19,
0x33, 0x5f, 0xc8, 0xd8, 0xf1, 0x05, 0x4f, 0xd2, 0xd0, 0xb9, 0xfe, 0x2d, 0x1a, 0xd4, 0x34, 0x7e,
0xf6, 0x27, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x3a, 0xc6, 0x28, 0xa4, 0x04, 0x00, 0x00,
// 620 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x41, 0x4f, 0xd4, 0x40,
0x14, 0xde, 0xe2, 0xb2, 0x6e, 0xdf, 0x02, 0xc1, 0x09, 0x31, 0x15, 0x4d, 0x97, 0x6c, 0x8c, 0x21,
0x44, 0xb7, 0x09, 0xde, 0xf0, 0x04, 0x0b, 0x1a, 0xe2, 0x56, 0x74, 0xc4, 0x90, 0x78, 0x9b, 0xed,
0xbe, 0xed, 0x4e, 0x6c, 0x3b, 0x75, 0x3a, 0x85, 0xe5, 0x1f, 0x78, 0xf4, 0x48, 0x3c, 0xf9, 0x43,
0xfc, 0x01, 0x1c, 0x39, 0x7a, 0x42, 0x65, 0xff, 0x81, 0xbf, 0xc0, 0x74, 0x5a, 0xa0, 0x08, 0x18,
0x3c, 0xed, 0x7e, 0xdf, 0x7c, 0xdf, 0xeb, 0x9b, 0xf7, 0xbe, 0x16, 0xee, 0xc7, 0x52, 0x28, 0xe1,
0xc4, 0x3d, 0x4f, 0x84, 0xa1, 0x88, 0x9c, 0xfc, 0xa7, 0xad, 0x59, 0x52, 0xcb, 0xd1, 0xbc, 0xed,
0x0b, 0xe1, 0x07, 0xe8, 0x68, 0xb6, 0x97, 0x0e, 0x9c, 0x7e, 0x2a, 0x99, 0xe2, 0xa7, 0xba, 0xf9,
0x39, 0x5f, 0xf8, 0x22, 0x2f, 0x94, 0xfd, 0xcb, 0xd9, 0x56, 0x08, 0x26, 0x65, 0x03, 0xb5, 0x19,
0xf5, 0x71, 0x44, 0x1c, 0x68, 0x74, 0x24, 0x32, 0x85, 0x1a, 0x5a, 0xc6, 0x82, 0xb1, 0x58, 0x5d,
0x9b, 0xfe, 0x7d, 0xdc, 0x34, 0x7b, 0x38, 0x8a, 0xe5, 0x4a, 0xeb, 0x49, 0x8b, 0x96, 0x15, 0x99,
0xc1, 0x15, 0x7d, 0x3e, 0xd8, 0xcf, 0x0d, 0x13, 0x57, 0x1a, 0x4a, 0x8a, 0xd6, 0x32, 0xcc, 0x6e,
0x33, 0xe9, 0xa3, 0x5a, 0x67, 0x8a, 0x79, 0x18, 0x29, 0x94, 0xc4, 0x06, 0x38, 0x47, 0xfa, 0xa1,
0x26, 0x2d, 0x31, 0xad, 0x25, 0x98, 0xda, 0x91, 0x5c, 0x21, 0xc5, 0x8f, 0x29, 0x26, 0x8a, 0xcc,
0xc1, 0xe4, 0xb6, 0xf8, 0x80, 0x51, 0x21, 0xcd, 0xc1, 0x4a, 0xf5, 0xd3, 0xd7, 0xa6, 0xd1, 0xda,
0x81, 0x06, 0x45, 0xd6, 0xff, 0xa7, 0x94, 0x3c, 0x86, 0x3b, 0x99, 0x80, 0x4b, 0xec, 0x88, 0x28,
0xe1, 0x89, 0xc2, 0x48, 0xe9, 0xde, 0xeb, 0xf4, 0xf2, 0x41, 0x51, 0xf8, 0x4b, 0x15, 0xa6, 0xde,
0xa4, 0x28, 0xf7, 0xb7, 0xe2, 0x6c, 0xa6, 0xc9, 0x35, 0xa5, 0x1f, 0xc2, 0xb4, 0xcb, 0x23, 0x2d,
0x2c, 0x8d, 0x84, 0x5e, 0x24, 0xc9, 0x0b, 0x98, 0x72, 0xd9, 0x48, 0x13, 0xdb, 0x3c, 0x44, 0xeb,
0xd6, 0x82, 0xb1, 0xd8, 0x58, 0xbe, 0xd7, 0xce, 0x37, 0xd8, 0x3e, 0xdd, 0x60, 0x7b, 0xbd, 0xd8,
0xe0, 0x5a, 0xfd, 0xf0, 0xb8, 0x59, 0x39, 0xf8, 0xd1, 0x34, 0xe8, 0x05, 0x63, 0x36, 0xba, 0xd5,
0x20, 0x10, 0x7b, 0x6f, 0x15, 0x0b, 0xd0, 0xaa, 0xea, 0x2b, 0x94, 0x98, 0xab, 0x6f, 0x3a, 0x79,
0xcd, 0x4d, 0xc9, 0x3c, 0xd4, 0xdf, 0x25, 0xd8, 0x61, 0xde, 0x10, 0xad, 0x9a, 0x16, 0x9d, 0x61,
0xb2, 0x05, 0xb3, 0x2e, 0x1b, 0xe9, 0xaa, 0xa7, 0x5d, 0x59, 0xb7, 0x6f, 0xde, 0xf6, 0x25, 0x33,
0x79, 0x06, 0x35, 0x97, 0x8d, 0x56, 0x7d, 0xb4, 0xea, 0x37, 0x2f, 0x53, 0x58, 0xc8, 0x23, 0x98,
0x71, 0xd3, 0x44, 0x51, 0xdc, 0x65, 0x01, 0xef, 0x33, 0x85, 0x96, 0xa9, 0xfb, 0xfd, 0x8b, 0xcd,
0x06, 0xad, 0x9f, 0xba, 0x39, 0xd8, 0x90, 0x52, 0x48, 0x0b, 0xfe, 0x63, 0xd0, 0x65, 0x23, 0xb9,
0x0b, 0xb5, 0xe7, 0x3c, 0xc8, 0xf2, 0xd9, 0xd0, 0xeb, 0x2e, 0x50, 0x11, 0x8e, 0x6f, 0x06, 0x98,
0x7a, 0x29, 0x2e, 0x2a, 0x96, 0x25, 0xa3, 0xf4, 0xfe, 0xd0, 0x1c, 0x90, 0x0d, 0x68, 0x74, 0x59,
0xa2, 0x3a, 0x22, 0x52, 0xcc, 0xcb, 0xe3, 0x76, 0xc3, 0x4e, 0xca, 0x3e, 0xb2, 0x00, 0x8d, 0x97,
0x91, 0xd8, 0x8b, 0xba, 0xc8, 0xfa, 0x28, 0x75, 0x72, 0xea, 0xb4, 0x4c, 0x91, 0x25, 0x98, 0x3d,
0xdb, 0xa9, 0xb7, 0xdf, 0xc5, 0x5d, 0x0c, 0x74, 0x32, 0x4c, 0x7a, 0x89, 0x2f, 0xda, 0xef, 0xc2,
0xcc, 0x46, 0xf6, 0xa6, 0xc5, 0x92, 0x27, 0xa8, 0xaf, 0xf0, 0x00, 0xcc, 0x57, 0x2c, 0xc4, 0x24,
0x66, 0x1e, 0x16, 0x01, 0x3f, 0x27, 0xb2, 0xd3, 0xd7, 0x4c, 0x2a, 0xae, 0x43, 0x30, 0x91, 0x9f,
0x9e, 0x11, 0x6b, 0xdd, 0xc3, 0x5f, 0x76, 0xe5, 0xf0, 0xc4, 0x36, 0x8e, 0x4e, 0x6c, 0xe3, 0xe7,
0x89, 0x6d, 0x7c, 0x1e, 0xdb, 0x95, 0x83, 0xb1, 0x5d, 0x39, 0x1a, 0xdb, 0x95, 0xef, 0x63, 0xbb,
0xf2, 0x7e, 0xc9, 0xe7, 0x6a, 0x98, 0xf6, 0xda, 0x9e, 0x08, 0x9d, 0x21, 0x4b, 0x86, 0xdc, 0x13,
0x32, 0x76, 0x3c, 0x11, 0x25, 0x69, 0xe0, 0x5c, 0xfc, 0xd4, 0xf5, 0x6a, 0x1a, 0x3f, 0xfd, 0x13,
0x00, 0x00, 0xff, 0xff, 0x9c, 0xf6, 0xbd, 0xcc, 0x03, 0x05, 0x00, 0x00,
}
func (m *RaftIndex) Marshal() (dAtA []byte, err error) {
@ -581,6 +644,46 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReadRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.RequireConsistent {
i--
if m.RequireConsistent {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x10
}
if len(m.Token) > 0 {
i -= len(m.Token)
copy(dAtA[i:], m.Token)
i = encodeVarintCommon(dAtA, i, uint64(len(m.Token)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *QueryOptions) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -768,10 +871,10 @@ func (m *EnterpriseMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.Tenant) > 0 {
i -= len(m.Tenant)
copy(dAtA[i:], m.Tenant)
i = encodeVarintCommon(dAtA, i, uint64(len(m.Tenant)))
if len(m.Partition) > 0 {
i -= len(m.Partition)
copy(dAtA[i:], m.Partition)
i = encodeVarintCommon(dAtA, i, uint64(len(m.Partition)))
i--
dAtA[i] = 0x12
}
@ -837,6 +940,22 @@ func (m *WriteRequest) Size() (n int) {
return n
}
func (m *ReadRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Token)
if l > 0 {
n += 1 + l + sovCommon(uint64(l))
}
if m.RequireConsistent {
n += 2
}
return n
}
func (m *QueryOptions) Size() (n int) {
if m == nil {
return 0
@ -908,7 +1027,7 @@ func (m *EnterpriseMeta) Size() (n int) {
if l > 0 {
n += 1 + l + sovCommon(uint64(l))
}
l = len(m.Tenant)
l = len(m.Partition)
if l > 0 {
n += 1 + l + sovCommon(uint64(l))
}
@ -1182,6 +1301,111 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCommon
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthCommon
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Token = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RequireConsistent", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.RequireConsistent = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipCommon(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCommon
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthCommon
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *QueryOptions) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1750,7 +1974,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -1778,7 +2002,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Tenant = string(dAtA[iNdEx:postIndex])
m.Partition = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex

View File

@ -38,6 +38,24 @@ message WriteRequest {
string Token = 1;
}
// ReadRequest is a type that may be embedded into any requests for read
// operations.
// It is a replacement for QueryOptions now that we no longer need any of those
// fields because we are moving away from using blocking queries.
// It is also similar to WriteRequest. It is a separate type so that in the
// future we can introduce fields that may only be relevant for reads.
message ReadRequest {
option (gogoproto.goproto_getters) = true;
// Token is the ACL token ID. If not provided, the 'anonymous'
// token is assumed for backwards compatibility.
string Token = 1;
// RequireConsistent indicates that the request must be sent to the leader.
bool RequireConsistent = 2;
}
// QueryOptions is used to specify various flags for read queries
message QueryOptions {
// The autogenerated getters will implement half of the
@ -139,6 +157,6 @@ message QueryMeta {
message EnterpriseMeta {
// Namespace in which the entity exists.
string Namespace = 1;
// Tenant in which the entity exists.
string Tenant = 2;
// Partition in which the entity exists.
string Partition = 2;
}