2016-10-26 02:20:24 +00:00
|
|
|
// The snapshot endpoint is a special non-RPC endpoint that supports streaming
|
|
|
|
// for taking and restoring snapshots for disaster recovery. This gets wired
|
|
|
|
// directly into Consul's stream handler, and a new TCP connection is made for
|
|
|
|
// each request.
|
|
|
|
//
|
|
|
|
// This also includes a SnapshotRPC() function, which acts as a lightweight
|
|
|
|
// client that knows the details of the stream protocol.
|
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"net"
|
2017-03-05 22:30:49 +00:00
|
|
|
"time"
|
2016-10-26 02:20:24 +00:00
|
|
|
|
pkg refactor
command/agent/* -> agent/*
command/consul/* -> agent/consul/*
command/agent/command{,_test}.go -> command/agent{,_test}.go
command/base/command.go -> command/base.go
command/base/* -> command/*
commands.go -> command/commands.go
The script which did the refactor is:
(
cd $GOPATH/src/github.com/hashicorp/consul
git mv command/agent/command.go command/agent.go
git mv command/agent/command_test.go command/agent_test.go
git mv command/agent/flag_slice_value{,_test}.go command/
git mv command/agent .
git mv command/base/command.go command/base.go
git mv command/base/config_util{,_test}.go command/
git mv commands.go command/
git mv consul agent
rmdir command/base/
gsed -i -e 's|package agent|package command|' command/agent{,_test}.go
gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go
gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go
gsed -i -e 's|package main|package command|' command/commands.go
gsed -i -e 's|base.Command|BaseCommand|' command/commands.go
gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go
gsed -i -e 's|base\.||' command/commands.go
gsed -i -e 's|command\.||' command/commands.go
gsed -i -e 's|command|c|' main.go
gsed -i -e 's|range Commands|range command.Commands|' main.go
gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go
gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go
gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go
gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go
gsed -i -e 's|base.Command|BaseCommand|' command/*.go
gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go
gsed -i -e 's|base\.||' command/*_test.go
gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go
gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go
gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go
gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go
gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go
gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go
gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go
gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go
gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go
gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go
gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go
gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go
gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go
gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go
gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go
gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go
gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile
gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go
# fix imports
f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f
goimports -w $f
f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f
goimports -w $f
goimports -w command/*.go main.go
)
2017-06-09 22:28:28 +00:00
|
|
|
"github.com/hashicorp/consul/agent/consul/structs"
|
2017-06-15 13:16:16 +00:00
|
|
|
"github.com/hashicorp/consul/agent/pool"
|
2016-11-04 04:36:25 +00:00
|
|
|
"github.com/hashicorp/consul/snapshot"
|
2016-10-26 02:20:24 +00:00
|
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
|
|
)
|
|
|
|
|
|
|
|
// dispatchSnapshotRequest takes an incoming request structure with possibly some
|
|
|
|
// streaming data (for a restore) and returns possibly some streaming data (for
|
|
|
|
// a snapshot save). We can't use the normal RPC mechanism in a streaming manner
|
|
|
|
// like this, so we have to dispatch these by hand.
|
|
|
|
func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Reader,
|
|
|
|
reply *structs.SnapshotResponse) (io.ReadCloser, error) {
|
|
|
|
|
|
|
|
// Perform DC forwarding.
|
|
|
|
if dc := args.Datacenter; dc != s.config.Datacenter {
|
2017-03-14 01:54:34 +00:00
|
|
|
manager, server, ok := s.router.FindRoute(dc)
|
2016-10-26 02:20:24 +00:00
|
|
|
if !ok {
|
|
|
|
return nil, structs.ErrNoDCPath
|
|
|
|
}
|
2017-03-14 01:54:34 +00:00
|
|
|
|
2017-05-10 21:25:48 +00:00
|
|
|
snap, err := SnapshotRPC(s.connPool, dc, server.Addr, server.UseTLS, args, in, reply)
|
2017-03-14 01:54:34 +00:00
|
|
|
if err != nil {
|
|
|
|
manager.NotifyFailedServer(server)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return snap, nil
|
2016-10-26 02:20:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Perform leader forwarding if required.
|
|
|
|
if !args.AllowStale {
|
|
|
|
if isLeader, server := s.getLeader(); !isLeader {
|
|
|
|
if server == nil {
|
|
|
|
return nil, structs.ErrNoLeader
|
|
|
|
}
|
2017-05-10 21:25:48 +00:00
|
|
|
return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, server.UseTLS, args, in, reply)
|
2016-10-26 02:20:24 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify token is allowed to operate on snapshots. There's only a
|
|
|
|
// single ACL sense here (not read and write) since reading gets you
|
|
|
|
// all the ACLs and you could escalate from there.
|
|
|
|
if acl, err := s.resolveToken(args.Token); err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else if acl != nil && !acl.Snapshot() {
|
2017-04-21 00:02:42 +00:00
|
|
|
return nil, errPermissionDenied
|
2016-10-26 02:20:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Dispatch the operation.
|
|
|
|
switch args.Op {
|
|
|
|
case structs.SnapshotSave:
|
|
|
|
if !args.AllowStale {
|
|
|
|
if err := s.consistentRead(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set the metadata here before we do anything; this should always be
|
|
|
|
// pessimistic if we get more data while the snapshot is being taken.
|
|
|
|
s.setQueryMeta(&reply.QueryMeta)
|
|
|
|
|
|
|
|
// Take the snapshot and capture the index.
|
|
|
|
snap, err := snapshot.New(s.logger, s.raft)
|
|
|
|
reply.Index = snap.Index()
|
|
|
|
return snap, err
|
|
|
|
|
|
|
|
case structs.SnapshotRestore:
|
|
|
|
if args.AllowStale {
|
|
|
|
return nil, fmt.Errorf("stale not allowed for restore")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Restore the snapshot.
|
|
|
|
if err := snapshot.Restore(s.logger, in, s.raft); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run a barrier so we are sure that our FSM is caught up with
|
|
|
|
// any snapshot restore details (it's also part of Raft's restore
|
|
|
|
// process but we don't want to depend on that detail for this to
|
|
|
|
// be correct). Once that works, we can redo the leader actions
|
|
|
|
// so our leader-maintained state will be up to date.
|
|
|
|
barrier := s.raft.Barrier(0)
|
|
|
|
if err := barrier.Error(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-05-04 03:31:14 +00:00
|
|
|
|
2017-05-04 18:52:22 +00:00
|
|
|
// This'll be used for feedback from the leader loop.
|
|
|
|
errCh := make(chan error, 1)
|
|
|
|
timeoutCh := time.After(time.Minute)
|
|
|
|
|
2017-05-04 14:48:54 +00:00
|
|
|
select {
|
2017-05-04 03:31:14 +00:00
|
|
|
// Tell the leader loop to reassert leader actions since we just
|
|
|
|
// replaced the state store contents.
|
2017-05-04 18:52:22 +00:00
|
|
|
case s.reassertLeaderCh <- errCh:
|
|
|
|
|
|
|
|
// We might have lost leadership while waiting to kick the loop.
|
|
|
|
case <-timeoutCh:
|
|
|
|
return nil, fmt.Errorf("timed out waiting to re-run leader actions")
|
|
|
|
|
|
|
|
// Make sure we don't get stuck during shutdown
|
|
|
|
case <-s.shutdownCh:
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
// Wait for the leader loop to finish up.
|
|
|
|
case err := <-errCh:
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// We might have lost leadership while the loop was doing its
|
|
|
|
// thing.
|
|
|
|
case <-timeoutCh:
|
|
|
|
return nil, fmt.Errorf("timed out waiting for re-run of leader actions")
|
2017-05-04 14:48:54 +00:00
|
|
|
|
|
|
|
// Make sure we don't get stuck during shutdown
|
|
|
|
case <-s.shutdownCh:
|
|
|
|
}
|
2016-10-26 02:20:24 +00:00
|
|
|
|
|
|
|
// Give the caller back an empty reader since there's nothing to
|
|
|
|
// stream back.
|
|
|
|
return ioutil.NopCloser(bytes.NewReader([]byte(""))), nil
|
|
|
|
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unrecognized snapshot op %q", args.Op)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleSnapshotRequest reads the request from the conn and dispatches it. This
|
|
|
|
// will be called from a goroutine after an incoming stream is determined to be
|
|
|
|
// a snapshot request.
|
|
|
|
func (s *Server) handleSnapshotRequest(conn net.Conn) error {
|
|
|
|
var args structs.SnapshotRequest
|
|
|
|
dec := codec.NewDecoder(conn, &codec.MsgpackHandle{})
|
|
|
|
if err := dec.Decode(&args); err != nil {
|
|
|
|
return fmt.Errorf("failed to decode request: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var reply structs.SnapshotResponse
|
|
|
|
snap, err := s.dispatchSnapshotRequest(&args, conn, &reply)
|
|
|
|
if err != nil {
|
|
|
|
reply.Error = err.Error()
|
|
|
|
goto RESPOND
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if err := snap.Close(); err != nil {
|
|
|
|
s.logger.Printf("[ERR] consul: Failed to close snapshot: %v", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
RESPOND:
|
|
|
|
enc := codec.NewEncoder(conn, &codec.MsgpackHandle{})
|
|
|
|
if err := enc.Encode(&reply); err != nil {
|
|
|
|
return fmt.Errorf("failed to encode response: %v", err)
|
|
|
|
}
|
|
|
|
if snap != nil {
|
|
|
|
if _, err := io.Copy(conn, snap); err != nil {
|
|
|
|
return fmt.Errorf("failed to stream snapshot: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SnapshotRPC is a streaming client function for performing a snapshot RPC
|
|
|
|
// request to a remote server. It will create a fresh connection for each
|
|
|
|
// request, send the request header, and then stream in any data from the
|
|
|
|
// reader (for a restore). It will then parse the received response header, and
|
|
|
|
// if there's no error will return an io.ReadCloser (that you must close) with
|
|
|
|
// the streaming output (for a snapshot). If the reply contains an error, this
|
|
|
|
// will always return an error as well, so you don't need to check the error
|
|
|
|
// inside the filled-in reply.
|
2017-06-15 13:16:16 +00:00
|
|
|
func SnapshotRPC(connPool *pool.ConnPool, dc string, addr net.Addr, useTLS bool,
|
2016-10-26 02:20:24 +00:00
|
|
|
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
|
|
|
|
|
2017-06-15 13:16:16 +00:00
|
|
|
conn, hc, err := connPool.DialTimeout(dc, addr, 10*time.Second, useTLS)
|
2016-10-26 02:20:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// keep will disarm the defer on success if we are returning the caller
|
|
|
|
// our connection to stream the output.
|
|
|
|
var keep bool
|
|
|
|
defer func() {
|
|
|
|
if !keep {
|
|
|
|
conn.Close()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Write the snapshot RPC byte to set the mode, then perform the
|
|
|
|
// request.
|
2017-06-15 13:16:16 +00:00
|
|
|
if _, err := conn.Write([]byte{byte(pool.RPCSnapshot)}); err != nil {
|
2016-10-26 02:20:24 +00:00
|
|
|
return nil, fmt.Errorf("failed to write stream type: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push the header encoded as msgpack, then stream the input.
|
|
|
|
enc := codec.NewEncoder(conn, &codec.MsgpackHandle{})
|
|
|
|
if err := enc.Encode(&args); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to encode request: %v", err)
|
|
|
|
}
|
|
|
|
if _, err := io.Copy(conn, in); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to copy snapshot in: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Our RPC protocol requires support for a half-close in order to signal
|
|
|
|
// the other side that they are done reading the stream, since we don't
|
|
|
|
// know the size in advance. This saves us from having to buffer just to
|
|
|
|
// calculate the size.
|
|
|
|
if hc != nil {
|
|
|
|
if err := hc.CloseWrite(); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to half close snapshot connection: %v", err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("snapshot connection requires half-close support")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pull the header decoded as msgpack. The caller can continue to read
|
|
|
|
// the conn to stream the remaining data.
|
|
|
|
dec := codec.NewDecoder(conn, &codec.MsgpackHandle{})
|
|
|
|
if err := dec.Decode(reply); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to decode response: %v", err)
|
|
|
|
}
|
|
|
|
if reply.Error != "" {
|
|
|
|
return nil, errors.New(reply.Error)
|
|
|
|
}
|
|
|
|
|
|
|
|
keep = true
|
|
|
|
return conn, nil
|
|
|
|
}
|