open-consul/consul/rpc.go
James Phillips 8fcf695fa1
Pass state store pointer into the blocking query work function.
Previously the blocking functions all closed over the state store from
their first query, with would not have worked properly when a restore
occurred. This makes sure they get a frest state store pointer each time,
and that pointer is synchronized with the abandon watch.
2017-01-25 09:58:23 -08:00

441 lines
12 KiB
Go

package consul
import (
"crypto/tls"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
)
type RPCType byte
const (
rpcConsul RPCType = iota
rpcRaft
rpcMultiplex // Old Muxado byte, no longer supported.
rpcTLS
rpcMultiplexV2
rpcSnapshot
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
// defaultQueryTime is the amount of time we block waiting for a change
// if no time is specified. Previously we would wait the maxQueryTime.
defaultQueryTime = 300 * time.Second
// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
// is applied to the RPCHoldTimeout
jitterFraction = 16
// Warn if the Raft command is larger than this.
// If it's over 1MB something is probably being abusive.
raftWarnSize = 1024 * 1024
// enqueueLimit caps how long we will wait to enqueue
// a new Raft command. Something is probably wrong if this
// value is ever reached. However, it prevents us from blocking
// the requesting goroutine forever.
enqueueLimit = 30 * time.Second
)
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
for {
// Accept a connection
conn, err := s.rpcListener.Accept()
if err != nil {
if s.shutdown {
return
}
s.logger.Printf("[ERR] consul.rpc: failed to accept RPC conn: %v", err)
continue
}
go s.handleConn(conn, false)
metrics.IncrCounter([]string{"consul", "rpc", "accept_conn"}, 1)
}
}
// logConn is a wrapper around memberlist's LogConn so that we format references
// to "from" addresses in a consistent way. This is just a shorter name.
func logConn(conn net.Conn) string {
return memberlist.LogConn(conn)
}
// handleConn is used to determine if this is a Raft or
// Consul type RPC connection and invoke the correct handler
func (s *Server) handleConn(conn net.Conn, isTLS bool) {
// Read a single byte
buf := make([]byte, 1)
if _, err := conn.Read(buf); err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] consul.rpc: failed to read byte: %v %s", err, logConn(conn))
}
conn.Close()
return
}
// Enforce TLS if VerifyIncoming is set
if s.config.VerifyIncoming && !isTLS && RPCType(buf[0]) != rpcTLS {
s.logger.Printf("[WARN] consul.rpc: Non-TLS connection attempted with VerifyIncoming set %s", logConn(conn))
conn.Close()
return
}
// Switch on the byte
switch RPCType(buf[0]) {
case rpcConsul:
s.handleConsulConn(conn)
case rpcRaft:
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn)
case rpcTLS:
if s.rpcTLS == nil {
s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn))
conn.Close()
return
}
conn = tls.Server(conn, s.rpcTLS)
s.handleConn(conn, true)
case rpcMultiplexV2:
s.handleMultiplexV2(conn)
case rpcSnapshot:
s.handleSnapshotConn(conn)
default:
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn))
conn.Close()
return
}
}
// handleMultiplexV2 is used to multiplex a single incoming connection
// using the Yamux multiplexer
func (s *Server) handleMultiplexV2(conn net.Conn) {
defer conn.Close()
conf := yamux.DefaultConfig()
conf.LogOutput = s.config.LogOutput
server, _ := yamux.Server(conn, conf)
for {
sub, err := server.Accept()
if err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v %s", err, logConn(conn))
}
return
}
go s.handleConsulConn(sub)
}
}
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close()
rpcCodec := msgpackrpc.NewServerCodec(conn)
for {
select {
case <-s.shutdownCh:
return
default:
}
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: RPC error: %v %s", err, logConn(conn))
metrics.IncrCounter([]string{"consul", "rpc", "request_error"}, 1)
}
return
}
metrics.IncrCounter([]string{"consul", "rpc", "request"}, 1)
}
}
// handleSnapshotConn is used to dispatch snapshot saves and restores, which
// stream so don't use the normal RPC mechanism.
func (s *Server) handleSnapshotConn(conn net.Conn) {
go func() {
defer conn.Close()
if err := s.handleSnapshotRequest(conn); err != nil {
s.logger.Printf("[ERR] consul.rpc: Snapshot RPC error: %v %s", err, logConn(conn))
}
}()
}
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time
// Handle DC forwarding
dc := info.RequestDatacenter()
if dc != s.config.Datacenter {
err := s.forwardDC(method, dc, args, reply)
return true, err
}
// Check if we can allow a stale read
if info.IsRead() && info.AllowStaleRead() {
return false, nil
}
CHECK_LEADER:
// Find the leader
isLeader, remoteServer := s.getLeader()
// Handle the case we are the leader
if isLeader {
return false, nil
}
// Handle the case of a known leader
if remoteServer != nil {
err := s.forwardLeader(remoteServer, method, args, reply)
return true, err
}
// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
}
if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout {
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
select {
case <-time.After(jitter):
goto CHECK_LEADER
case <-s.shutdownCh:
}
}
// No leader found and hold time exceeded
return true, structs.ErrNoLeader
}
// getLeader returns if the current node is the leader, and if not then it
// returns the leader which is potentially nil if the cluster has not yet
// elected a leader.
func (s *Server) getLeader() (bool, *agent.Server) {
// Check if we are the leader
if s.IsLeader() {
return true, nil
}
// Get the leader
leader := s.raft.Leader()
if leader == "" {
return false, nil
}
// Lookup the server
s.localLock.RLock()
server := s.localConsuls[leader]
s.localLock.RUnlock()
// Server could be nil
return false, server
}
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error {
// Handle a missing server
if server == nil {
return structs.ErrNoLeader
}
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
}
// getRemoteServer returns a random server from a remote datacenter. This uses
// the bool parameter to signal that none were available.
func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) {
s.remoteLock.RLock()
defer s.remoteLock.RUnlock()
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
return nil, false
}
offset := rand.Int31n(int32(len(servers)))
server := servers[offset]
return server, true
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
server, ok := s.getRemoteServer(dc)
if !ok {
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
return structs.ErrNoDCPath
}
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
}
// globalRPC is used to forward an RPC request to one server in each datacenter.
// This will only error for RPC-related errors. Otherwise, application-level
// errors can be sent in the response objects.
func (s *Server) globalRPC(method string, args interface{},
reply structs.CompoundResponse) error {
errorCh := make(chan error)
respCh := make(chan interface{})
// Make a new request into each datacenter
s.remoteLock.RLock()
dcs := make([]string, 0, len(s.remoteConsuls))
for dc, _ := range s.remoteConsuls {
dcs = append(dcs, dc)
}
s.remoteLock.RUnlock()
for _, dc := range dcs {
go func(dc string) {
rr := reply.New()
if err := s.forwardDC(method, dc, args, &rr); err != nil {
errorCh <- err
return
}
respCh <- rr
}(dc)
}
replies, total := 0, len(s.remoteConsuls)
for replies < total {
select {
case err := <-errorCh:
return err
case rr := <-respCh:
reply.Add(rr)
replies++
}
}
return nil
}
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
buf, err := structs.Encode(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.logger.Printf("[WARN] consul: Attempting to apply large raft entry (%d bytes)", n)
}
future := s.raft.Apply(buf, enqueueLimit)
if err := future.Error(); err != nil {
return nil, err
}
return future.Response(), nil
}
// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes.
type queryFn func(memdb.WatchSet, *state.StateStore) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
fn queryFn) error {
var timeout *time.Timer
// Fast path right to the non-blocking query.
if queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Restrict the max query time, and ensure there is always one.
if queryOpts.MaxQueryTime > maxQueryTime {
queryOpts.MaxQueryTime = maxQueryTime
} else if queryOpts.MaxQueryTime <= 0 {
queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime)
defer timeout.Stop()
RUN_QUERY:
// Update the query metadata.
s.setQueryMeta(queryMeta)
// If the read must be consistent we verify that we are still the leader.
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
state := s.fsm.State()
// We can skip all watch tracking if this isn't a blocking query.
var ws memdb.WatchSet
if queryOpts.MinQueryIndex > 0 {
ws = memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(state.AbandonCh())
}
// Block up to the timeout if we didn't see anything fresh.
err := fn(ws, state)
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
goto RUN_QUERY
}
}
return err
}
// setQueryMeta is used to populate the QueryMeta data for an RPC call
func (s *Server) setQueryMeta(m *structs.QueryMeta) {
if s.IsLeader() {
m.LastContact = 0
m.KnownLeader = true
} else {
m.LastContact = time.Now().Sub(s.raft.LastContact())
m.KnownLeader = (s.raft.Leader() != "")
}
}
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
return future.Error()
}