From f8868c7ccf88aeb9c8898110416d45c14da99116 Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Mon, 19 Dec 2022 12:39:31 -0500 Subject: [PATCH] Add custom balancer to always remove subConns (#15701) The new balancer is a patched version of gRPC's default pick_first balancer which removes the behavior of preserving the active subconnection if a list of new addresses contains the currently active address. --- .changelog/15701.txt | 3 + .../grpc-internal/balancer/custombalancer.go | 87 ++++++++ agent/grpc-internal/balancer/pickfirst.go | 189 ++++++++++++++++++ agent/grpc-internal/client.go | 3 + agent/grpc-internal/resolver/resolver.go | 14 -- 5 files changed, 282 insertions(+), 14 deletions(-) create mode 100644 .changelog/15701.txt create mode 100644 agent/grpc-internal/balancer/custombalancer.go create mode 100644 agent/grpc-internal/balancer/pickfirst.go diff --git a/.changelog/15701.txt b/.changelog/15701.txt new file mode 100644 index 000000000..8caab9cac --- /dev/null +++ b/.changelog/15701.txt @@ -0,0 +1,3 @@ +```release-note:improvement +grpc: Use new balancer implementation to reduce periodic WARN logs when shuffling servers. +``` diff --git a/agent/grpc-internal/balancer/custombalancer.go b/agent/grpc-internal/balancer/custombalancer.go new file mode 100644 index 000000000..cdf3de89b --- /dev/null +++ b/agent/grpc-internal/balancer/custombalancer.go @@ -0,0 +1,87 @@ +package balancer + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +func init() { + balancer.Register(newCustomPickfirstBuilder()) +} + +// logger is referenced in pickfirst.go. +// The gRPC library uses the same component name. +var logger = grpclog.Component("balancer") + +func newCustomPickfirstBuilder() balancer.Builder { + return &customPickfirstBuilder{} +} + +type customPickfirstBuilder struct{} + +func (*customPickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { + return &customPickfirstBalancer{ + pickfirstBalancer: pickfirstBalancer{cc: cc}, + } +} + +func (*customPickfirstBuilder) Name() string { + return "pick_first_custom" +} + +// customPickfirstBalancer overrides UpdateClientConnState of pickfirstBalancer. +type customPickfirstBalancer struct { + pickfirstBalancer + + activeAddr resolver.Address +} + +func (b *customPickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + for _, a := range state.ResolverState.Addresses { + // This hack preserves an existing behavior in our client-side + // load balancing where if the first address in a shuffled list + // of addresses matched the currently connected address, it would + // be an effective no-op. + if a.Equal(b.activeAddr) { + return nil + } + + // Attempt to make a new SubConn with a single address so we can + // track a successful connection explicitly. If we were to pass + // a list of addresses, we cannot assume the first address was + // successful and there is no way to extract the connected address. + sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + logger.Warningf("balancer.customPickfirstBalancer: failed to create new SubConn: %v", err) + continue + } + + if b.subConn != nil { + b.cc.RemoveSubConn(b.subConn) + } + + // Copy-pasted from pickfirstBalancer.UpdateClientConnState. + { + b.subConn = sc + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + }) + b.subConn.Connect() + } + + b.activeAddr = a + + // We now have a new subConn with one address. + // Break the loop and call UpdateClientConnState + // with the full set of addresses. + break + } + + // This will load the full set of addresses but leave the + // newly created subConn alone. + return b.pickfirstBalancer.UpdateClientConnState(state) +} diff --git a/agent/grpc-internal/balancer/pickfirst.go b/agent/grpc-internal/balancer/pickfirst.go new file mode 100644 index 000000000..45edcddce --- /dev/null +++ b/agent/grpc-internal/balancer/pickfirst.go @@ -0,0 +1,189 @@ +// NOTICE: This file is a copy of grpc's pick_first implementation [1]. +// It is preserved as-is with the init() removed for easier updating. +// +// [1]: https://github.com/grpc/grpc-go/blob/v1.49.x/pickfirst.go + +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancer + +import ( + "errors" + "fmt" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" +) + +// PickFirstBalancerName is the name of the pick_first balancer. +const PickFirstBalancerName = "pick_first_original" + +func newPickfirstBuilder() balancer.Builder { + return &pickfirstBuilder{} +} + +type pickfirstBuilder struct{} + +func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { + return &pickfirstBalancer{cc: cc} +} + +func (*pickfirstBuilder) Name() string { + return PickFirstBalancerName +} + +type pickfirstBalancer struct { + state connectivity.State + cc balancer.ClientConn + subConn balancer.SubConn +} + +func (b *pickfirstBalancer) ResolverError(err error) { + if logger.V(2) { + logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) + } + if b.subConn == nil { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, + }) +} + +func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + if len(state.ResolverState.Addresses) == 0 { + // The resolver reported an empty address list. Treat it like an error by + // calling b.ResolverError. + if b.subConn != nil { + // Remove the old subConn. All addresses were removed, so it is no longer + // valid. + b.cc.RemoveSubConn(b.subConn) + b.subConn = nil + } + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + + if b.subConn != nil { + b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses) + return nil + } + + subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) + if err != nil { + if logger.V(2) { + logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) + } + b.state = connectivity.TransientFailure + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, + }) + return balancer.ErrBadResolverState + } + b.subConn = subConn + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + }) + b.subConn.Connect() + return nil +} + +func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { + if logger.V(2) { + logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state) + } + if b.subConn != subConn { + if logger.V(2) { + logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized") + } + return + } + b.state = state.ConnectivityState + if state.ConnectivityState == connectivity.Shutdown { + b.subConn = nil + return + } + + switch state.ConnectivityState { + case connectivity.Ready: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{result: balancer.PickResult{SubConn: subConn}}, + }) + case connectivity.Connecting: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) + case connectivity.Idle: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &idlePicker{subConn: subConn}, + }) + case connectivity.TransientFailure: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: state.ConnectionError}, + }) + } +} + +func (b *pickfirstBalancer) Close() { +} + +func (b *pickfirstBalancer) ExitIdle() { + if b.subConn != nil && b.state == connectivity.Idle { + b.subConn.Connect() + } +} + +type picker struct { + result balancer.PickResult + err error +} + +func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return p.result, p.err +} + +// idlePicker is used when the SubConn is IDLE and kicks the SubConn into +// CONNECTING when Pick is called. +type idlePicker struct { + subConn balancer.SubConn +} + +func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + i.subConn.Connect() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} + +// Intentionally removed +// func init() { +// balancer.Register(newPickfirstBuilder()) +// } diff --git a/agent/grpc-internal/client.go b/agent/grpc-internal/client.go index 18596eec7..9a1e8402a 100644 --- a/agent/grpc-internal/client.go +++ b/agent/grpc-internal/client.go @@ -13,6 +13,8 @@ import ( "github.com/armon/go-metrics" + _ "github.com/hashicorp/consul/agent/grpc-internal/balancer" + agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -134,6 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first_custom"}`), // Keep alive parameters are based on the same default ones we used for // Yamux. These are somewhat arbitrary but we did observe in scale testing // that the gRPC defaults (servers send keepalives only every 2 hours, diff --git a/agent/grpc-internal/resolver/resolver.go b/agent/grpc-internal/resolver/resolver.go index cf34c990f..87275449e 100644 --- a/agent/grpc-internal/resolver/resolver.go +++ b/agent/grpc-internal/resolver/resolver.go @@ -271,21 +271,7 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) { // updateAddrsLocked updates this serverResolver's ClientConn to use the given // set of addrs. addrLock must be held by caller. func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) { - // Only pass the first address initially, which will cause the - // balancer to spin down the connection for its previous first address - // if it is different. If we don't do this, it will keep using the old - // first address as long as it is still in the list, making it impossible to - // rebalance until that address is removed. - var firstAddr []resolver.Address - if len(addrs) > 0 { - firstAddr = []resolver.Address{addrs[0]} - } - r.clientConn.UpdateState(resolver.State{Addresses: firstAddr}) - - // Call UpdateState again with the entire list of addrs in case we need them - // for failover. r.clientConn.UpdateState(resolver.State{Addresses: addrs}) - r.addrs = addrs }