Add custom balancer to always remove subConns (#15701)
The new balancer is a patched version of gRPC's default pick_first balancer which removes the behavior of preserving the active subconnection if a list of new addresses contains the currently active address.
This commit is contained in:
parent
eff52f73be
commit
f8868c7ccf
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
grpc: Use new balancer implementation to reduce periodic WARN logs when shuffling servers.
|
||||||
|
```
|
|
@ -0,0 +1,87 @@
|
||||||
|
package balancer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
balancer.Register(newCustomPickfirstBuilder())
|
||||||
|
}
|
||||||
|
|
||||||
|
// logger is referenced in pickfirst.go.
|
||||||
|
// The gRPC library uses the same component name.
|
||||||
|
var logger = grpclog.Component("balancer")
|
||||||
|
|
||||||
|
func newCustomPickfirstBuilder() balancer.Builder {
|
||||||
|
return &customPickfirstBuilder{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type customPickfirstBuilder struct{}
|
||||||
|
|
||||||
|
func (*customPickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
||||||
|
return &customPickfirstBalancer{
|
||||||
|
pickfirstBalancer: pickfirstBalancer{cc: cc},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*customPickfirstBuilder) Name() string {
|
||||||
|
return "pick_first_custom"
|
||||||
|
}
|
||||||
|
|
||||||
|
// customPickfirstBalancer overrides UpdateClientConnState of pickfirstBalancer.
|
||||||
|
type customPickfirstBalancer struct {
|
||||||
|
pickfirstBalancer
|
||||||
|
|
||||||
|
activeAddr resolver.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *customPickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
|
||||||
|
for _, a := range state.ResolverState.Addresses {
|
||||||
|
// This hack preserves an existing behavior in our client-side
|
||||||
|
// load balancing where if the first address in a shuffled list
|
||||||
|
// of addresses matched the currently connected address, it would
|
||||||
|
// be an effective no-op.
|
||||||
|
if a.Equal(b.activeAddr) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to make a new SubConn with a single address so we can
|
||||||
|
// track a successful connection explicitly. If we were to pass
|
||||||
|
// a list of addresses, we cannot assume the first address was
|
||||||
|
// successful and there is no way to extract the connected address.
|
||||||
|
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
|
||||||
|
if err != nil {
|
||||||
|
logger.Warningf("balancer.customPickfirstBalancer: failed to create new SubConn: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.subConn != nil {
|
||||||
|
b.cc.RemoveSubConn(b.subConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy-pasted from pickfirstBalancer.UpdateClientConnState.
|
||||||
|
{
|
||||||
|
b.subConn = sc
|
||||||
|
b.state = connectivity.Idle
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: connectivity.Idle,
|
||||||
|
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
|
||||||
|
})
|
||||||
|
b.subConn.Connect()
|
||||||
|
}
|
||||||
|
|
||||||
|
b.activeAddr = a
|
||||||
|
|
||||||
|
// We now have a new subConn with one address.
|
||||||
|
// Break the loop and call UpdateClientConnState
|
||||||
|
// with the full set of addresses.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will load the full set of addresses but leave the
|
||||||
|
// newly created subConn alone.
|
||||||
|
return b.pickfirstBalancer.UpdateClientConnState(state)
|
||||||
|
}
|
|
@ -0,0 +1,189 @@
|
||||||
|
// NOTICE: This file is a copy of grpc's pick_first implementation [1].
|
||||||
|
// It is preserved as-is with the init() removed for easier updating.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/grpc/grpc-go/blob/v1.49.x/pickfirst.go
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2017 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package balancer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PickFirstBalancerName is the name of the pick_first balancer.
|
||||||
|
const PickFirstBalancerName = "pick_first_original"
|
||||||
|
|
||||||
|
func newPickfirstBuilder() balancer.Builder {
|
||||||
|
return &pickfirstBuilder{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type pickfirstBuilder struct{}
|
||||||
|
|
||||||
|
func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
||||||
|
return &pickfirstBalancer{cc: cc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*pickfirstBuilder) Name() string {
|
||||||
|
return PickFirstBalancerName
|
||||||
|
}
|
||||||
|
|
||||||
|
type pickfirstBalancer struct {
|
||||||
|
state connectivity.State
|
||||||
|
cc balancer.ClientConn
|
||||||
|
subConn balancer.SubConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *pickfirstBalancer) ResolverError(err error) {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
|
||||||
|
}
|
||||||
|
if b.subConn == nil {
|
||||||
|
b.state = connectivity.TransientFailure
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.state != connectivity.TransientFailure {
|
||||||
|
// The picker will not change since the balancer does not currently
|
||||||
|
// report an error.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: connectivity.TransientFailure,
|
||||||
|
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
|
||||||
|
if len(state.ResolverState.Addresses) == 0 {
|
||||||
|
// The resolver reported an empty address list. Treat it like an error by
|
||||||
|
// calling b.ResolverError.
|
||||||
|
if b.subConn != nil {
|
||||||
|
// Remove the old subConn. All addresses were removed, so it is no longer
|
||||||
|
// valid.
|
||||||
|
b.cc.RemoveSubConn(b.subConn)
|
||||||
|
b.subConn = nil
|
||||||
|
}
|
||||||
|
b.ResolverError(errors.New("produced zero addresses"))
|
||||||
|
return balancer.ErrBadResolverState
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.subConn != nil {
|
||||||
|
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
|
||||||
|
}
|
||||||
|
b.state = connectivity.TransientFailure
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: connectivity.TransientFailure,
|
||||||
|
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
|
||||||
|
})
|
||||||
|
return balancer.ErrBadResolverState
|
||||||
|
}
|
||||||
|
b.subConn = subConn
|
||||||
|
b.state = connectivity.Idle
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: connectivity.Idle,
|
||||||
|
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
|
||||||
|
})
|
||||||
|
b.subConn.Connect()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state)
|
||||||
|
}
|
||||||
|
if b.subConn != subConn {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.state = state.ConnectivityState
|
||||||
|
if state.ConnectivityState == connectivity.Shutdown {
|
||||||
|
b.subConn = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch state.ConnectivityState {
|
||||||
|
case connectivity.Ready:
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: state.ConnectivityState,
|
||||||
|
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
|
||||||
|
})
|
||||||
|
case connectivity.Connecting:
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: state.ConnectivityState,
|
||||||
|
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
|
||||||
|
})
|
||||||
|
case connectivity.Idle:
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: state.ConnectivityState,
|
||||||
|
Picker: &idlePicker{subConn: subConn},
|
||||||
|
})
|
||||||
|
case connectivity.TransientFailure:
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: state.ConnectivityState,
|
||||||
|
Picker: &picker{err: state.ConnectionError},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *pickfirstBalancer) Close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *pickfirstBalancer) ExitIdle() {
|
||||||
|
if b.subConn != nil && b.state == connectivity.Idle {
|
||||||
|
b.subConn.Connect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type picker struct {
|
||||||
|
result balancer.PickResult
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
|
||||||
|
return p.result, p.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
|
||||||
|
// CONNECTING when Pick is called.
|
||||||
|
type idlePicker struct {
|
||||||
|
subConn balancer.SubConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
|
||||||
|
i.subConn.Connect()
|
||||||
|
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
// Intentionally removed
|
||||||
|
// func init() {
|
||||||
|
// balancer.Register(newPickfirstBuilder())
|
||||||
|
// }
|
|
@ -13,6 +13,8 @@ import (
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
|
||||||
|
_ "github.com/hashicorp/consul/agent/grpc-internal/balancer"
|
||||||
|
|
||||||
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
|
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
@ -134,6 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
|
||||||
grpc.WithContextDialer(c.dialer),
|
grpc.WithContextDialer(c.dialer),
|
||||||
grpc.WithDisableRetry(),
|
grpc.WithDisableRetry(),
|
||||||
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)),
|
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)),
|
||||||
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first_custom"}`),
|
||||||
// Keep alive parameters are based on the same default ones we used for
|
// Keep alive parameters are based on the same default ones we used for
|
||||||
// Yamux. These are somewhat arbitrary but we did observe in scale testing
|
// Yamux. These are somewhat arbitrary but we did observe in scale testing
|
||||||
// that the gRPC defaults (servers send keepalives only every 2 hours,
|
// that the gRPC defaults (servers send keepalives only every 2 hours,
|
||||||
|
|
|
@ -271,21 +271,7 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
|
||||||
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
|
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
|
||||||
// set of addrs. addrLock must be held by caller.
|
// set of addrs. addrLock must be held by caller.
|
||||||
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
|
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
|
||||||
// Only pass the first address initially, which will cause the
|
|
||||||
// balancer to spin down the connection for its previous first address
|
|
||||||
// if it is different. If we don't do this, it will keep using the old
|
|
||||||
// first address as long as it is still in the list, making it impossible to
|
|
||||||
// rebalance until that address is removed.
|
|
||||||
var firstAddr []resolver.Address
|
|
||||||
if len(addrs) > 0 {
|
|
||||||
firstAddr = []resolver.Address{addrs[0]}
|
|
||||||
}
|
|
||||||
r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
|
|
||||||
|
|
||||||
// Call UpdateState again with the entire list of addrs in case we need them
|
|
||||||
// for failover.
|
|
||||||
r.clientConn.UpdateState(resolver.State{Addresses: addrs})
|
r.clientConn.UpdateState(resolver.State{Addresses: addrs})
|
||||||
|
|
||||||
r.addrs = addrs
|
r.addrs = addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue