Adds support for snapshots and restores. (#2396)

* Updates Raft library to get new snapshot/restore API.

* Basic backup and restore working, but need some cleanup.

* Breaks out a snapshot module and adds a SHA256 integrity check.

* Adds snapshot ACL and fills in some missing comments.

* Require a consistent read for snapshots.

* Make sure snapshot works if ACLs aren't enabled.

* Adds a bit of package documentation.

* Returns an empty response from restore to avoid EOF errors.

* Adds API client support for snapshots.

* Makes internal file names match on-disk file snapshots.

* Adds DC and token coverage for snapshot API test.

* Adds missing documentation.

* Adds a unit test for the snapshot client endpoint.

* Moves the connection pool out of the client for easier testing.

* Fixes an incidental issue in the prepared query unit test.

I realized I had two servers in bootstrap mode so this wasn't a good setup.

* Adds a half close to the TCP stream and fixes panic on error.

* Adds client and endpoint tests for snapshots.

* Moves the pool back into the snapshot RPC client.

* Adds a TLS test and fixes half-closes for TLS connections.

* Tweaks some comments.

* Adds a low-level snapshot test.

This is independent of Consul so we can pull this out into a library
later if we want to.

* Cleans up snapshot and archive and completes archive tests.

* Sends a clear error for snapshot operations in dev mode.

Snapshots require the Raft snapshots to be readable, which isn't supported
in dev mode. Send a clear error instead of a deep-down Raft one.

* Adds docs for the snapshot endpoint.

* Adds a stale mode and index feedback for snapshot saves.

This gives folks a way to extract data even if the cluster has no
leader.

* Changes the internal format of a snapshot from zip to tgz.

* Pulls in Raft fix to cancel inflight before a restore.

* Pulls in new Raft restore interface.

* Adds metadata to snapshot saves and a verify function.

* Adds basic save and restore snapshot CLI commands.

* Gets rid of tarball extensions and adds restore message.

* Fixes an incidental bad link in the KV docs.

* Adds documentation for the snapshot CLI commands.

* Scuttle any request body when a snapshot is saved.

* Fixes archive unit test error message check.

* Allows for nil output writers in snapshot RPC handlers.

* Renames hash list Decode to DecodeAndVerify.

* Closes the client connection for snapshot ops.

* Lowers timeout for restore ops.

* Updates Raft vendor to get new Restore signature and integrates with Consul.

* Bounces the leader's internal state when we do a restore.
This commit is contained in:
James Phillips 2016-10-25 19:20:24 -07:00 committed by GitHub
parent 8f31e8b205
commit bc29610124
50 changed files with 3396 additions and 182 deletions

View File

@ -86,6 +86,9 @@ type ACL interface {
// ACLModify checks for permission to manipulate ACLs
ACLModify() bool
// Snapshot checks for permission to take and restore snapshots.
Snapshot() bool
}
// StaticACL is used to implement a base ACL policy. It either
@ -156,6 +159,10 @@ func (s *StaticACL) ACLModify() bool {
return s.allowManage
}
func (s *StaticACL) Snapshot() bool {
return s.allowManage
}
// AllowAll returns an ACL rule that allows all operations
func AllowAll() ACL {
return allowAll
@ -474,3 +481,8 @@ func (p *PolicyACL) ACLList() bool {
func (p *PolicyACL) ACLModify() bool {
return p.parent.ACLModify()
}
// Snapshot checks if taking and restoring snapshots is allowed.
func (p *PolicyACL) Snapshot() bool {
return p.parent.Snapshot()
}

View File

@ -77,6 +77,9 @@ func TestStaticACL(t *testing.T) {
if all.ACLModify() {
t.Fatalf("should not allow")
}
if all.Snapshot() {
t.Fatalf("should not allow")
}
if none.KeyRead("foobar") {
t.Fatalf("should not allow")
@ -126,6 +129,9 @@ func TestStaticACL(t *testing.T) {
if none.ACLModify() {
t.Fatalf("should not allow")
}
if none.Snapshot() {
t.Fatalf("should not allow")
}
if !manage.KeyRead("foobar") {
t.Fatalf("should allow")
@ -169,6 +175,9 @@ func TestStaticACL(t *testing.T) {
if !manage.ACLModify() {
t.Fatalf("should allow")
}
if !manage.Snapshot() {
t.Fatalf("should allow")
}
}
func TestPolicyACL(t *testing.T) {
@ -495,6 +504,9 @@ func TestPolicyACL_Parent(t *testing.T) {
if acl.ACLModify() {
t.Fatalf("should not allow")
}
if acl.Snapshot() {
t.Fatalf("should not allow")
}
}
func TestPolicyACL_Keyring(t *testing.T) {

47
api/snapshot.go Normal file
View File

@ -0,0 +1,47 @@
package api
import (
"io"
)
// Snapshot can be used to query the /v1/snapshot endpoint to take snapshots of
// Consul's internal state and restore snapshots for disaster recovery.
type Snapshot struct {
c *Client
}
// Snapshot returns a handle that exposes the snapshot endpoints.
func (c *Client) Snapshot() *Snapshot {
return &Snapshot{c}
}
// Save requests a new snapshot and provides an io.ReadCloser with the snapshot
// data to save. If this doesn't return an error, then it's the responsibility
// of the caller to close it. Only a subset of the QueryOptions are supported:
// Datacenter, AllowStale, and Token.
func (s *Snapshot) Save(q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/snapshot")
r.setQueryOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return nil, nil, err
}
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
return resp.Body, qm, nil
}
// Restore streams in an existing snapshot and attempts to restore it.
func (s *Snapshot) Restore(q *WriteOptions, in io.Reader) error {
r := s.c.newRequest("PUT", "/v1/snapshot")
r.body = in
r.setWriteOptions(q)
_, _, err := requireOK(s.c.doRequest(r))
if err != nil {
return err
}
return nil
}

134
api/snapshot_test.go Normal file
View File

@ -0,0 +1,134 @@
package api
import (
"bytes"
"strings"
"testing"
)
func TestSnapshot(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// Place an initial key into the store.
kv := c.KV()
key := &KVPair{Key: testKey(), Value: []byte("hello")}
if _, err := kv.Put(key, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it reads back.
pair, _, err := kv.Get(key.Key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if !bytes.Equal(pair.Value, []byte("hello")) {
t.Fatalf("unexpected value: %#v", pair)
}
// Take a snapshot.
snapshot := c.Snapshot()
snap, qm, err := snapshot.Save(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// Sanity check th query metadata.
if qm.LastIndex == 0 || !qm.KnownLeader ||
qm.RequestTime == 0 {
t.Fatalf("bad: %v", qm)
}
// Overwrite the key's value.
key.Value = []byte("goodbye")
if _, err := kv.Put(key, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Read the key back and look for the new value.
pair, _, err = kv.Get(key.Key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if !bytes.Equal(pair.Value, []byte("goodbye")) {
t.Fatalf("unexpected value: %#v", pair)
}
// Restore the snapshot.
if err := snapshot.Restore(nil, snap); err != nil {
t.Fatalf("err: %v", err)
}
// Read the key back and look for the original value.
pair, _, err = kv.Get(key.Key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if !bytes.Equal(pair.Value, []byte("hello")) {
t.Fatalf("unexpected value: %#v", pair)
}
}
func TestSnapshot_Options(t *testing.T) {
t.Parallel()
c, s := makeACLClient(t)
defer s.Stop()
// Try to take a snapshot with a bad token.
snapshot := c.Snapshot()
_, _, err := snapshot.Save(&QueryOptions{Token: "anonymous"})
if err == nil || !strings.Contains(err.Error(), "Permission denied") {
t.Fatalf("err: %v", err)
}
// Now try an unknown DC.
_, _, err = snapshot.Save(&QueryOptions{Datacenter: "nope"})
if err == nil || !strings.Contains(err.Error(), "No path to datacenter") {
t.Fatalf("err: %v", err)
}
// This should work with a valid token.
snap, _, err := snapshot.Save(&QueryOptions{Token: "root"})
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// This should work with a stale snapshot. This doesn't have good feedback
// that the stale option was sent, but it makes sure nothing bad happens.
snap, _, err = snapshot.Save(&QueryOptions{Token: "root", AllowStale: true})
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// Try to restore a snapshot with a bad token.
null := bytes.NewReader([]byte(""))
err = snapshot.Restore(&WriteOptions{Token: "anonymous"}, null)
if err == nil || !strings.Contains(err.Error(), "Permission denied") {
t.Fatalf("err: %v", err)
}
// Now try an unknown DC.
null = bytes.NewReader([]byte(""))
err = snapshot.Restore(&WriteOptions{Datacenter: "nope"}, null)
if err == nil || !strings.Contains(err.Error(), "No path to datacenter") {
t.Fatalf("err: %v", err)
}
// This should work.
if err := snapshot.Restore(&WriteOptions{Token: "root"}, snap); err != nil {
t.Fatalf("err: %v", err)
}
}

View File

@ -469,6 +469,19 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
return a.client.RPC(method, args, reply)
}
// SnapshotRPC performs the requested snapshot RPC against the Consul server in
// a streaming manner. The contents of in will be read and passed along as the
// payload, and the response message will determine the error status, and any
// return payload will be written to out.
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn consul.SnapshotReplyFn) error {
if a.server != nil {
return a.server.SnapshotRPC(args, in, out, replyFn)
}
return a.client.SnapshotRPC(args, in, out, replyFn)
}
// Leave is used to prepare the agent for a graceful shutdown
func (a *Agent) Leave() error {
if a.server != nil {

View File

@ -231,64 +231,7 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons
func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/", s.Index)
s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices))
s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
if !s.agent.config.DisableCoordinates {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
} else {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
}
s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))
s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass))
s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn))
s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail))
s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate))
s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance))
s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy))
s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew))
s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet))
s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode))
s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList))
// API V1.
if s.agent.config.ACLDatacenter != "" {
s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate))
s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate))
@ -306,12 +249,62 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.handleFuncMetrics("/v1/acl/list", s.wrap(aclDisabled))
s.handleFuncMetrics("/v1/acl/replication", s.wrap(aclDisabled))
}
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))
s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass))
s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn))
s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail))
s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate))
s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance))
s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices))
s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
if !s.agent.config.DisableCoordinates {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
} else {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
}
s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes))
s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices))
s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint))
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy))
s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew))
s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet))
s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode))
s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList))
s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot))
s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn))
// Debug endpoints.
if enableDebug {
s.handleFuncMetrics("/debug/pprof/", pprof.Index)
s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline)
@ -326,10 +319,6 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS())))
}
// API's are under /internal/ui/ to avoid conflict
s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes))
s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices))
}
// wrap is used to wrap functions to make them more convenient

View File

@ -0,0 +1,50 @@
package agent
import (
"bytes"
"net/http"
"github.com/hashicorp/consul/consul/structs"
)
// Snapshot handles requests to take and restore snapshots. This uses a special
// mechanism to make the RPC since we potentially stream large amounts of data
// as part of these requests.
func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.SnapshotRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
if _, ok := req.URL.Query()["stale"]; ok {
args.AllowStale = true
}
switch req.Method {
case "GET":
args.Op = structs.SnapshotSave
// Headers need to go out before we stream the body.
replyFn := func(reply *structs.SnapshotResponse) error {
setMeta(resp, &reply.QueryMeta)
return nil
}
// Don't bother sending any request body through since it will
// be ignored.
var null bytes.Buffer
if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil {
return nil, err
}
return nil, nil
case "PUT":
args.Op = structs.SnapshotRestore
if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil {
return nil, err
}
return nil, nil
default:
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
}

View File

@ -0,0 +1,142 @@
package agent
import (
"bytes"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestSnapshot(t *testing.T) {
var snap io.Reader
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/snapshot?token=root", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
snap = resp.Body
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("bad: %v", header)
}
header = resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("bad: %v", header)
}
header = resp.Header().Get("X-Consul-LastContact")
if header != "0" {
t.Fatalf("bad: %v", header)
}
})
httpTest(t, func(srv *HTTPServer) {
req, err := http.NewRequest("PUT", "/v1/snapshot?token=root", snap)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
})
}
func TestSnapshot_Options(t *testing.T) {
for _, method := range []string{"GET", "PUT"} {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest(method, "/v1/snapshot?token=anonymous", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err == nil || !strings.Contains(err.Error(), "Permission denied") {
t.Fatalf("err: %v", err)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest(method, "/v1/snapshot?dc=nope", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err == nil || !strings.Contains(err.Error(), "No path to datacenter") {
t.Fatalf("err: %v", err)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest(method, "/v1/snapshot?token=root&stale", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if method == "GET" {
if err != nil {
t.Fatalf("err: %v", err)
}
} else {
if err == nil || !strings.Contains(err.Error(), "stale not allowed") {
t.Fatalf("err: %v", err)
}
}
})
}
}
func TestSnapshot_BadMethods(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("POST", "/v1/snapshot", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("bad code: %d", resp.Code)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/snapshot", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.Snapshot(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("bad code: %d", resp.Code)
}
})
}

View File

@ -0,0 +1,48 @@
package command
import (
"strings"
"github.com/mitchellh/cli"
)
// SnapshotCommand is a Command implementation that just shows help for
// the subcommands nested below it.
type SnapshotCommand struct {
Ui cli.Ui
}
func (c *SnapshotCommand) Run(args []string) int {
return cli.RunResultHelp
}
func (c *SnapshotCommand) Help() string {
helpText := `
Usage: consul snapshot <subcommand> [options] [args]
This command has subcommands for saving and restoring the state of the Consul
servers for disaster recovery. These are atomic, point-in-time snapshots which
include key/value entries, service catalog, prepared queries, sessions, and
ACLs.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot operations.
Create a snapshot:
$ consul snapshot save backup.snap
Restore a snapshot:
$ consul snapshot restore backup.snap
For more examples, ask for subcommand help or view the documentation.
`
return strings.TrimSpace(helpText)
}
func (c *SnapshotCommand) Synopsis() string {
return "Saves and restores snapshots of Consul server state"
}

View File

@ -0,0 +1,15 @@
package command
import (
"testing"
"github.com/mitchellh/cli"
)
func TestSnapshotCommand_implements(t *testing.T) {
var _ cli.Command = &SnapshotCommand{}
}
func TestSnapshotCommand_noTabs(t *testing.T) {
assertNoTabs(t, new(SnapshotCommand))
}

103
command/snapshot_restore.go Normal file
View File

@ -0,0 +1,103 @@
package command
import (
"flag"
"fmt"
"os"
"strings"
"github.com/hashicorp/consul/api"
"github.com/mitchellh/cli"
)
// SnapshotRestoreCommand is a Command implementation that is used to restore
// the state of the Consul servers for disaster recovery.
type SnapshotRestoreCommand struct {
Ui cli.Ui
}
func (c *SnapshotRestoreCommand) Help() string {
helpText := `
Usage: consul snapshot restore [options] FILE
Restores an atomic, point-in-time snapshot of the state of the Consul servers
which includes key/value entries, service catalog, prepared queries, sessions,
and ACLs.
Restores involve a potentially dangerous low-level Raft operation that is not
designed to handle server failures during a restore. This command is primarily
intended to be used when recovering from a disaster, restoring into a fresh
cluster of Consul servers.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot operations.
To restore a snapshot from the file "backup.snap":
$ consul snapshot restore backup.snap
For a full list of options and examples, please see the Consul documentation.
` + apiOptsText
return strings.TrimSpace(helpText)
}
func (c *SnapshotRestoreCommand) Run(args []string) int {
cmdFlags := flag.NewFlagSet("get", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
datacenter := cmdFlags.String("datacenter", "", "")
token := cmdFlags.String("token", "", "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
}
var file string
args = cmdFlags.Args()
switch len(args) {
case 0:
c.Ui.Error("Missing FILE argument")
return 1
case 1:
file = args[0]
default:
c.Ui.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args)))
return 1
}
// Create and test the HTTP client
conf := api.DefaultConfig()
conf.Address = *httpAddr
conf.Token = *token
client, err := api.NewClient(conf)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
// Open the file.
f, err := os.Open(file)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err))
return 1
}
defer f.Close()
// Restore the snapshot.
err = client.Snapshot().Restore(&api.WriteOptions{
Datacenter: *datacenter,
}, f)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error restoring snapshot: %s", err))
return 1
}
c.Ui.Info("Restored snapshot")
return 0
}
func (c *SnapshotRestoreCommand) Synopsis() string {
return "Restores snapshot of Consul server state"
}

View File

@ -0,0 +1,103 @@
package command
import (
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestSnapshotRestoreCommand_implements(t *testing.T) {
var _ cli.Command = &SnapshotRestoreCommand{}
}
func TestSnapshotRestoreCommand_noTabs(t *testing.T) {
assertNoTabs(t, new(SnapshotRestoreCommand))
}
func TestSnapshotRestoreCommand_Validation(t *testing.T) {
ui := new(cli.MockUi)
c := &SnapshotRestoreCommand{Ui: ui}
cases := map[string]struct {
args []string
output string
}{
"no file": {
[]string{},
"Missing FILE argument",
},
"extra args": {
[]string{"foo", "bar", "baz"},
"Too many arguments",
},
}
for name, tc := range cases {
// Ensure our buffer is always clear
if ui.ErrorWriter != nil {
ui.ErrorWriter.Reset()
}
if ui.OutputWriter != nil {
ui.OutputWriter.Reset()
}
code := c.Run(tc.args)
if code == 0 {
t.Errorf("%s: expected non-zero exit", name)
}
output := ui.ErrorWriter.String()
if !strings.Contains(output, tc.output) {
t.Errorf("%s: expected %q to contain %q", name, output, tc.output)
}
}
}
func TestSnapshotRestoreCommand_Run(t *testing.T) {
srv, client := testAgentWithAPIClient(t)
defer srv.Shutdown()
waitForLeader(t, srv.httpAddr)
ui := new(cli.MockUi)
c := &SnapshotSaveCommand{Ui: ui}
dir, err := ioutil.TempDir("", "snapshot")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)
file := path.Join(dir, "backup.tgz")
args := []string{
"-http-addr=" + srv.httpAddr,
file,
}
f, err := os.Create(file)
if err != nil {
t.Fatalf("err: %v", err)
}
snap, _, err := client.Snapshot().Save(nil)
if err != nil {
f.Close()
t.Fatalf("err: %v", err)
}
if _, err := io.Copy(f, snap); err != nil {
f.Close()
t.Fatalf("err: %v", err)
}
if err := f.Close(); err != nil {
t.Fatalf("err: %v", err)
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
}

132
command/snapshot_save.go Normal file
View File

@ -0,0 +1,132 @@
package command
import (
"flag"
"fmt"
"io"
"os"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/snapshot"
"github.com/mitchellh/cli"
)
// SnapshotSaveCommand is a Command implementation that is used to save the
// state of the Consul servers for disaster recovery.
type SnapshotSaveCommand struct {
Ui cli.Ui
}
func (c *SnapshotSaveCommand) Help() string {
helpText := `
Usage: consul snapshot save [options] FILE
Retrieves an atomic, point-in-time snapshot of the state of the Consul servers
which includes key/value entries, service catalog, prepared queries, sessions,
and ACLs.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot operations.
To create a snapshot from the leader server and save it to "backup.snap":
$ consul snapshot save backup.snap
To create a potentially stale snapshot from any available server (useful if no
leader is available):
$ consul snapshot save -stale backup.snap
For a full list of options and examples, please see the Consul documentation.
` + apiOptsText
return strings.TrimSpace(helpText)
}
func (c *SnapshotSaveCommand) Run(args []string) int {
cmdFlags := flag.NewFlagSet("get", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
datacenter := cmdFlags.String("datacenter", "", "")
token := cmdFlags.String("token", "", "")
stale := cmdFlags.Bool("stale", false, "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
}
var file string
args = cmdFlags.Args()
switch len(args) {
case 0:
c.Ui.Error("Missing FILE argument")
return 1
case 1:
file = args[0]
default:
c.Ui.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args)))
return 1
}
// Create and test the HTTP client
conf := api.DefaultConfig()
conf.Address = *httpAddr
conf.Token = *token
client, err := api.NewClient(conf)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
// Take the snapshot.
snap, qm, err := client.Snapshot().Save(&api.QueryOptions{
Datacenter: *datacenter,
AllowStale: *stale,
})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error saving snapshot: %s", err))
return 1
}
defer snap.Close()
// Save the file.
f, err := os.Create(file)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error creating snapshot file: %s", err))
return 1
}
if _, err := io.Copy(f, snap); err != nil {
f.Close()
c.Ui.Error(fmt.Sprintf("Error writing snapshot file: %s", err))
return 1
}
if err := f.Close(); err != nil {
c.Ui.Error(fmt.Sprintf("Error closing snapshot file after writing: %s", err))
return 1
}
// Read it back to verify.
f, err = os.Open(file)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file for verify: %s", err))
return 1
}
if err := snapshot.Verify(f); err != nil {
f.Close()
c.Ui.Error(fmt.Sprintf("Error verifying snapshot file: %s", err))
return 1
}
if err := f.Close(); err != nil {
c.Ui.Error(fmt.Sprintf("Error closing snapshot file after verify: %s", err))
return 1
}
c.Ui.Info(fmt.Sprintf("Saved and verified snapshot to index %d", qm.LastIndex))
return 0
}
func (c *SnapshotSaveCommand) Synopsis() string {
return "Saves snapshot of Consul server state"
}

View File

@ -0,0 +1,94 @@
package command
import (
"io/ioutil"
"os"
"path"
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestSnapshotSaveCommand_implements(t *testing.T) {
var _ cli.Command = &SnapshotSaveCommand{}
}
func TestSnapshotSaveCommand_noTabs(t *testing.T) {
assertNoTabs(t, new(SnapshotSaveCommand))
}
func TestSnapshotSaveCommand_Validation(t *testing.T) {
ui := new(cli.MockUi)
c := &SnapshotSaveCommand{Ui: ui}
cases := map[string]struct {
args []string
output string
}{
"no file": {
[]string{},
"Missing FILE argument",
},
"extra args": {
[]string{"foo", "bar", "baz"},
"Too many arguments",
},
}
for name, tc := range cases {
// Ensure our buffer is always clear
if ui.ErrorWriter != nil {
ui.ErrorWriter.Reset()
}
if ui.OutputWriter != nil {
ui.OutputWriter.Reset()
}
code := c.Run(tc.args)
if code == 0 {
t.Errorf("%s: expected non-zero exit", name)
}
output := ui.ErrorWriter.String()
if !strings.Contains(output, tc.output) {
t.Errorf("%s: expected %q to contain %q", name, output, tc.output)
}
}
}
func TestSnapshotSaveCommand_Run(t *testing.T) {
srv, client := testAgentWithAPIClient(t)
defer srv.Shutdown()
waitForLeader(t, srv.httpAddr)
ui := new(cli.MockUi)
c := &SnapshotSaveCommand{Ui: ui}
dir, err := ioutil.TempDir("", "snapshot")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)
file := path.Join(dir, "backup.tgz")
args := []string{
"-http-addr=" + srv.httpAddr,
file,
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
f, err := os.Open(file)
if err != nil {
t.Fatalf("err: %v", err)
}
defer f.Close()
if err := client.Snapshot().Restore(nil, f); err != nil {
t.Fatalf("err: %v", err)
}
}

View File

@ -151,6 +151,24 @@ func init() {
}, nil
},
"snapshot": func() (cli.Command, error) {
return &command.SnapshotCommand{
Ui: ui,
}, nil
},
"snapshot restore": func() (cli.Command, error) {
return &command.SnapshotRestoreCommand{
Ui: ui,
}, nil
},
"snapshot save": func() (cli.Command, error) {
return &command.SnapshotSaveCommand{
Ui: ui,
}, nil
},
"version": func() (cli.Command, error) {
return &command.VersionCommand{
HumanVersion: GetHumanVersion(),

View File

@ -2,6 +2,7 @@ package consul
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
@ -337,6 +338,51 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return nil
}
// SnapshotReplyFn gets a peek at the reply before the snapshot streams, which
// is useful for setting headers.
type SnapshotReplyFn func(reply *structs.SnapshotResponse) error
// SnapshotRPC sends the snapshot request to one of the servers, reading from
// the streaming input and writing to the streaming output depending on the
// operation.
func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn SnapshotReplyFn) error {
// Locate a server to make the request to.
server := c.servers.FindServer()
if server == nil {
return structs.ErrNoServers
}
// Request the operation.
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply)
if err != nil {
return err
}
defer func() {
if err := snap.Close(); err != nil {
c.logger.Printf("[WARN] consul: Failed closing snapshot stream: %v", err)
}
}()
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn(&reply); err != nil {
return nil
}
}
// Stream the snapshot.
if out != nil {
if _, err := io.Copy(out, snap); err != nil {
return fmt.Errorf("failed to stream snapshot: %v", err)
}
}
return nil
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {

View File

@ -1,6 +1,7 @@
package consul
import (
"bytes"
"fmt"
"net"
"os"
@ -360,6 +361,111 @@ func TestClient_RPC_TLS(t *testing.T) {
})
}
func TestClient_SnapshotRPC(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Wait for the leader
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 {
t.Fatalf("Server has %v of %v expected members; Client has %v of %v expected members.", len(s1.LANMembers()), 2, len(c1.LANMembers()), 2)
}
// Wait until we've got a healthy server.
testutil.WaitForResult(func() (bool, error) {
return c1.servers.NumServers() == 1, nil
}, func(err error) {
t.Fatalf("expected consul server")
})
// Take a snapshot.
var snap bytes.Buffer
args := structs.SnapshotRequest{
Datacenter: "dc1",
Op: structs.SnapshotSave,
}
if err := c1.SnapshotRPC(&args, bytes.NewReader([]byte("")), &snap, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Restore a snapshot.
args.Op = structs.SnapshotRestore
if err := c1.SnapshotRPC(&args, &snap, nil, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestClient_SnapshotRPC_TLS(t *testing.T) {
dir1, conf1 := testServerConfig(t, "a.testco.internal")
conf1.VerifyIncoming = true
conf1.VerifyOutgoing = true
configureTLS(conf1)
s1, err := NewServer(conf1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, conf2 := testClientConfig(t, "b.testco.internal")
conf2.VerifyOutgoing = true
configureTLS(conf2)
c1, err := NewClient(conf2)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Wait for the leader
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 {
t.Fatalf("Server has %v of %v expected members; Client has %v of %v expected members.", len(s1.LANMembers()), 2, len(c1.LANMembers()), 2)
}
// Wait until we've got a healthy server.
testutil.WaitForResult(func() (bool, error) {
return c1.servers.NumServers() == 1, nil
}, func(err error) {
t.Fatalf("expected consul server")
})
// Take a snapshot.
var snap bytes.Buffer
args := structs.SnapshotRequest{
Datacenter: "dc1",
Op: structs.SnapshotSave,
}
if err := c1.SnapshotRPC(&args, bytes.NewReader([]byte("")), &snap, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Restore a snapshot.
args.Op = structs.SnapshotRestore
if err := c1.SnapshotRPC(&args, &snap, nil, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestClientServer_UserEvent(t *testing.T) {
clientOut := make(chan serf.UserEvent, 2)
dir1, c1 := testClientWithConfig(t, func(conf *Config) {

View File

@ -248,18 +248,29 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
return nil, fmt.Errorf("rpc error: lead thread didn't get connection")
}
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
// HalfCloser is an interface that exposes a TCP half-close. We need this
// because we want to expose the raw TCP connection underlying a TLS one in a
// way that's hard to screw up and use for anything else. There's a change
// brewing that will allow us to use the TLS connection for this instead -
// https://go-review.googlesource.com/#/c/25159/.
type HalfCloser interface {
CloseWrite() error
}
// Dial is used to establish a raw connection to the given server.
func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
return nil, err
return nil, nil, err
}
// Cast to TCPConn
var hc HalfCloser
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetKeepAlive(true)
tcp.SetNoDelay(true)
hc = tcp
}
// Check if TLS is enabled
@ -267,18 +278,29 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, err
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
conn.Close()
return nil, err
return nil, nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := p.tlsWrap(dc, conn)
if err != nil {
conn.Close()
return nil, err
return nil, nil, err
}
conn = tlsConn
}
return conn, hc, nil
}
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
// Get a new, raw connection.
conn, _, err := p.Dial(dc, addr)
if err != nil {
return nil, err
}
// Switch the multiplexing based on version
var session muxSession
if version < 2 {

View File

@ -459,7 +459,9 @@ func TestPreparedQuery_Apply_ACLDeny(t *testing.T) {
}
func TestPreparedQuery_Apply_ForwardLeader(t *testing.T) {
dir1, s1 := testServer(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)

View File

@ -27,6 +27,7 @@ const (
rpcMultiplex // Old Muxado byte, no longer supported.
rpcTLS
rpcMultiplexV2
rpcSnapshot
)
const (
@ -119,6 +120,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
case rpcMultiplexV2:
s.handleMultiplexV2(conn)
case rpcSnapshot:
s.handleSnapshotConn(conn)
default:
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn))
conn.Close()
@ -167,6 +171,17 @@ func (s *Server) handleConsulConn(conn net.Conn) {
}
}
// handleSnapshotConn is used to dispatch snapshot saves and restores, which
// stream so don't use the normal RPC mechanism.
func (s *Server) handleSnapshotConn(conn net.Conn) {
go func() {
defer conn.Close()
if err := s.handleSnapshotRequest(conn); err != nil {
s.logger.Printf("[ERR] consul.rpc: Snapshot RPC error: %v %s", err, logConn(conn))
}
}()
}
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
@ -216,9 +231,9 @@ CHECK_LEADER:
return true, structs.ErrNoLeader
}
// getLeader returns if the current node is the leader, and if not
// then it returns the leader which is potentially nil if the cluster
// has not yet elected a leader.
// getLeader returns if the current node is the leader, and if not then it
// returns the leader which is potentially nil if the cluster has not yet
// elected a leader.
func (s *Server) getLeader() (bool, *agent.Server) {
// Check if we are the leader
if s.IsLeader() {
@ -249,23 +264,29 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
// Bail if we can't find any servers
// getRemoteServer returns a random server from a remote datacenter. This uses
// the bool parameter to signal that none were available.
func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) {
s.remoteLock.RLock()
defer s.remoteLock.RUnlock()
servers := s.remoteConsuls[dc]
if len(servers) == 0 {
s.remoteLock.RUnlock()
return nil, false
}
offset := rand.Int31n(int32(len(servers)))
server := servers[offset]
return server, true
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
server, ok := s.getRemoteServer(dc)
if !ok {
s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc)
return structs.ErrNoDCPath
}
// Select a random addr
offset := rand.Int31n(int32(len(servers)))
server := servers[offset]
s.remoteLock.RUnlock()
// Forward to remote Consul
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
}

View File

@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
@ -806,6 +807,39 @@ func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
return codec.err
}
// SnapshotRPC dispatches the given snapshot request, reading from the streaming
// input and writing to the streaming output depending on the operation.
func (s *Server) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn SnapshotReplyFn) error {
// Perform the operation.
var reply structs.SnapshotResponse
snap, err := s.dispatchSnapshotRequest(args, in, &reply)
if err != nil {
return err
}
defer func() {
if err := snap.Close(); err != nil {
s.logger.Printf("[ERR] consul: Failed to close snapshot: %v", err)
}
}()
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn(&reply); err != nil {
return nil
}
}
// Stream the snapshot.
if out != nil {
if _, err := io.Copy(out, snap); err != nil {
return fmt.Errorf("failed to stream snapshot: %v", err)
}
}
return nil
}
// InjectEndpoint is used to substitute an endpoint for testing.
func (s *Server) InjectEndpoint(endpoint interface{}) error {
s.logger.Printf("[WARN] consul: endpoint injected; this should only be used for testing")

223
consul/snapshot/archive.go Normal file
View File

@ -0,0 +1,223 @@
// The archive utilities manage the internal format of a snapshot, which is a
// tar file with the following contents:
//
// meta.json - JSON-encoded snapshot metadata from Raft
// state.bin - Encoded snapshot data from Raft
// SHA256SUMS - SHA-256 sums of the above two files
//
// The integrity information is automatically created and checked, and a failure
// there just looks like an error to the caller.
package snapshot
import (
"archive/tar"
"bufio"
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"hash"
"io"
"time"
"github.com/hashicorp/raft"
)
// hashList manages a list of filenames and their hashes.
type hashList struct {
hashes map[string]hash.Hash
}
// newHashList returns a new hashList.
func newHashList() *hashList {
return &hashList{
hashes: make(map[string]hash.Hash),
}
}
// Add creates a new hash for the given file.
func (hl *hashList) Add(file string) hash.Hash {
if existing, ok := hl.hashes[file]; ok {
return existing
}
h := sha256.New()
hl.hashes[file] = h
return h
}
// Encode takes the current sum of all the hashes and saves the hash list as a
// SHA256SUMS-style text file.
func (hl *hashList) Encode(w io.Writer) error {
for file, h := range hl.hashes {
if _, err := fmt.Fprintf(w, "%x %s\n", h.Sum([]byte{}), file); err != nil {
return err
}
}
return nil
}
// DecodeAndVerify reads a SHA256SUMS-style text file and checks the results
// against the current sums for all the hashes.
func (hl *hashList) DecodeAndVerify(r io.Reader) error {
// Read the file and make sure everything in there has a matching hash.
seen := make(map[string]struct{})
s := bufio.NewScanner(r)
for s.Scan() {
sha := make([]byte, sha256.Size)
var file string
if _, err := fmt.Sscanf(s.Text(), "%x %s", &sha, &file); err != nil {
return err
}
h, ok := hl.hashes[file]
if !ok {
return fmt.Errorf("list missing hash for %q", file)
}
if !bytes.Equal(sha, h.Sum([]byte{})) {
return fmt.Errorf("hash check failed for %q", file)
}
seen[file] = struct{}{}
}
if err := s.Err(); err != nil {
return err
}
// Make sure everything we had a hash for was seen.
for file, _ := range hl.hashes {
if _, ok := seen[file]; !ok {
return fmt.Errorf("file missing for %q", file)
}
}
return nil
}
// write takes a writer and creates an archive with the snapshot metadata,
// the snapshot itself, and adds some integrity checking information.
func write(out io.Writer, metadata *raft.SnapshotMeta, snap io.Reader) error {
// Start a new tarball.
now := time.Now()
archive := tar.NewWriter(out)
// Create a hash list that we will use to write a SHA256SUMS file into
// the archive.
hl := newHashList()
// Encode the snapshot metadata, which we need to feed back during a
// restore.
metaHash := hl.Add("meta.json")
var metaBuffer bytes.Buffer
enc := json.NewEncoder(&metaBuffer)
if err := enc.Encode(metadata); err != nil {
return fmt.Errorf("failed to encode snapshot metadata: %v", err)
}
if err := archive.WriteHeader(&tar.Header{
Name: "meta.json",
Mode: 0600,
Size: int64(metaBuffer.Len()),
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot metadata header: %v", err)
}
if _, err := io.Copy(archive, io.TeeReader(&metaBuffer, metaHash)); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Copy the snapshot data given the size from the metadata.
snapHash := hl.Add("state.bin")
if err := archive.WriteHeader(&tar.Header{
Name: "state.bin",
Mode: 0600,
Size: metadata.Size,
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot data header: %v", err)
}
if _, err := io.CopyN(archive, io.TeeReader(snap, snapHash), metadata.Size); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Create a SHA256SUMS file that we can use to verify on restore.
var shaBuffer bytes.Buffer
if err := hl.Encode(&shaBuffer); err != nil {
return fmt.Errorf("failed to encode snapshot hashes: %v", err)
}
if err := archive.WriteHeader(&tar.Header{
Name: "SHA256SUMS",
Mode: 0600,
Size: int64(shaBuffer.Len()),
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot hashes header: %v", err)
}
if _, err := io.Copy(archive, &shaBuffer); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Finalize the archive.
if err := archive.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
return nil
}
// read takes a reader and extracts the snapshot metadata and the snapshot
// itself, and also checks the integrity of the data. You must arrange to call
// Close() on the returned object or else you will leak a temporary file.
func read(in io.Reader, metadata *raft.SnapshotMeta, snap io.Writer) error {
// Start a new tar reader.
archive := tar.NewReader(in)
// Create a hash list that we will use to compare with the SHA256SUMS
// file in the archive.
hl := newHashList()
// Populate the hashes for all the files we expect to see. The check at
// the end will make sure these are all present in the SHA256SUMS file
// and that the hashes match.
metaHash := hl.Add("meta.json")
snapHash := hl.Add("state.bin")
// Look through the archive for the pieces we care about.
var shaBuffer bytes.Buffer
for {
hdr, err := archive.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed reading snapshot: %v", err)
}
switch hdr.Name {
case "meta.json":
dec := json.NewDecoder(io.TeeReader(archive, metaHash))
if err := dec.Decode(&metadata); err != nil {
return fmt.Errorf("failed to decode snapshot metadata: %v", err)
}
case "state.bin":
if _, err := io.Copy(io.MultiWriter(snap, snapHash), archive); err != nil {
return fmt.Errorf("failed to read or write snapshot data: %v", err)
}
case "SHA256SUMS":
if _, err := io.Copy(&shaBuffer, archive); err != nil {
return fmt.Errorf("failed to read snapshot hashes: %v", err)
}
default:
return fmt.Errorf("unexpected file %q in snapshot", hdr.Name)
}
}
// Verify all the hashes.
if err := hl.DecodeAndVerify(&shaBuffer); err != nil {
return fmt.Errorf("failed checking integrity of snapshot: %v", err)
}
return nil
}

View File

@ -0,0 +1,134 @@
package snapshot
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
"github.com/hashicorp/raft"
)
func TestArchive(t *testing.T) {
// Create some fake snapshot data.
metadata := raft.SnapshotMeta{
Index: 2005,
Term: 2011,
Configuration: raft.Configuration{
Servers: []raft.Server{
raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID("hello"),
Address: raft.ServerAddress("127.0.0.1:8300"),
},
},
},
Size: 1024,
}
var snap bytes.Buffer
var expected bytes.Buffer
both := io.MultiWriter(&snap, &expected)
if _, err := io.Copy(both, io.LimitReader(rand.Reader, 1024)); err != nil {
t.Fatalf("err: %v", err)
}
// Write out the snapshot.
var archive bytes.Buffer
if err := write(&archive, &metadata, &snap); err != nil {
t.Fatalf("err: %v", err)
}
// Read the snapshot back.
var newMeta raft.SnapshotMeta
var newSnap bytes.Buffer
if err := read(&archive, &newMeta, &newSnap); err != nil {
t.Fatalf("err: %v", err)
}
// Check the contents.
if !reflect.DeepEqual(newMeta, metadata) {
t.Fatalf("bad: %#v", newMeta)
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, &newSnap); err != nil {
t.Fatalf("err: %v", err)
}
if !bytes.Equal(buf.Bytes(), expected.Bytes()) {
t.Fatalf("snapshot contents didn't match")
}
}
func TestArchive_BadData(t *testing.T) {
cases := []struct {
Name string
Error string
}{
{"../../test/snapshot/empty.tar", "failed checking integrity of snapshot"},
{"../../test/snapshot/extra.tar", "unexpected file \"nope\""},
{"../../test/snapshot/missing-meta.tar", "hash check failed for \"meta.json\""},
{"../../test/snapshot/missing-state.tar", "hash check failed for \"state.bin\""},
{"../../test/snapshot/missing-sha.tar", "file missing"},
{"../../test/snapshot/corrupt-meta.tar", "hash check failed for \"meta.json\""},
{"../../test/snapshot/corrupt-state.tar", "hash check failed for \"state.bin\""},
{"../../test/snapshot/corrupt-sha.tar", "list missing hash for \"nope\""},
}
for i, c := range cases {
f, err := os.Open(c.Name)
if err != nil {
t.Fatalf("err: %v", err)
}
defer f.Close()
var metadata raft.SnapshotMeta
err = read(f, &metadata, ioutil.Discard)
if err == nil || !strings.Contains(err.Error(), c.Error) {
t.Fatalf("case %d (%s): %v", i, c.Name, err)
}
}
}
func TestArchive_hashList(t *testing.T) {
hl := newHashList()
for i := 0; i < 16; i++ {
h := hl.Add(fmt.Sprintf("file-%d", i))
if _, err := io.CopyN(h, rand.Reader, 32); err != nil {
t.Fatalf("err: %v", err)
}
}
// Do a normal round trip.
var buf bytes.Buffer
if err := hl.Encode(&buf); err != nil {
t.Fatalf("err: %v", err)
}
if err := hl.DecodeAndVerify(&buf); err != nil {
t.Fatalf("err: %v", err)
}
// Have a local hash that isn't in the file.
buf.Reset()
if err := hl.Encode(&buf); err != nil {
t.Fatalf("err: %v", err)
}
hl.Add("nope")
err := hl.DecodeAndVerify(&buf)
if err == nil || !strings.Contains(err.Error(), "file missing for \"nope\"") {
t.Fatalf("err: %v", err)
}
// Have a hash in the file that we haven't seen locally.
buf.Reset()
if err := hl.Encode(&buf); err != nil {
t.Fatalf("err: %v", err)
}
delete(hl.hashes, "nope")
err = hl.DecodeAndVerify(&buf)
if err == nil || !strings.Contains(err.Error(), "list missing hash for \"nope\"") {
t.Fatalf("err: %v", err)
}
}

193
consul/snapshot/snapshot.go Normal file
View File

@ -0,0 +1,193 @@
// snapshot manages the interactions between Consul and Raft in order to take
// and restore snapshots for disaster recovery. The internal format of a
// snapshot is simply a tar file, as described in archive.go.
package snapshot
import (
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time"
"github.com/hashicorp/raft"
)
// Snapshot is a structure that holds state about a temporary file that is used
// to hold a snapshot. By using an intermediate file we avoid holding everything
// in memory.
type Snapshot struct {
file *os.File
index uint64
}
// New takes a state snapshot of the given Raft instance into a temporary file
// and returns an object that gives access to the file as an io.Reader. You must
// arrange to call Close() on the returned object or else you will leak a
// temporary file.
func New(logger *log.Logger, r *raft.Raft) (*Snapshot, error) {
// Take the snapshot.
future := r.Snapshot()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("Raft error when taking snapshot: %v", err)
}
// Open up the snapshot.
metadata, snap, err := future.Open()
if err != nil {
return nil, fmt.Errorf("failed to open snapshot: %v:", err)
}
defer func() {
if err := snap.Close(); err != nil {
logger.Printf("[ERR] snapshot: Failed to close Raft snapshot: %v", err)
}
}()
// Make a scratch file to receive the contents so that we don't buffer
// everything in memory. This gets deleted in Close() since we keep it
// around for re-reading.
archive, err := ioutil.TempFile("", "snapshot")
if err != nil {
return nil, fmt.Errorf("failed to create snapshot file: %v", err)
}
// If anything goes wrong after this point, we will attempt to clean up
// the temp file. The happy path will disarm this.
var keep bool
defer func() {
if keep {
return
}
if err := os.Remove(archive.Name()); err != nil {
logger.Printf("[ERR] snapshot: Failed to clean up temp snapshot: %v", err)
}
}()
// Wrap the file writer in a gzip compressor.
compressor := gzip.NewWriter(archive)
// Write the archive.
if err := write(compressor, metadata, snap); err != nil {
return nil, fmt.Errorf("failed to write snapshot file: %v", err)
}
// Finish the compressed stream.
if err := compressor.Close(); err != nil {
return nil, fmt.Errorf("failed to compress snapshot file: %v", err)
}
// Sync the compressed file and rewind it so it's ready to be streamed
// out by the caller.
if err := archive.Sync(); err != nil {
return nil, fmt.Errorf("failed to sync snapshot: %v", err)
}
if _, err := archive.Seek(0, 0); err != nil {
return nil, fmt.Errorf("failed to rewind snapshot: %v", err)
}
keep = true
return &Snapshot{archive, metadata.Index}, nil
}
// Index returns the index of the snapshot. This is safe to call on a nil
// snapshot, it will just return 0.
func (s *Snapshot) Index() uint64 {
if s == nil {
return 0
}
return s.index
}
// Read passes through to the underlying snapshot file. This is safe to call on
// a nil snapshot, it will just return an EOF.
func (s *Snapshot) Read(p []byte) (n int, err error) {
if s == nil {
return 0, io.EOF
}
return s.file.Read(p)
}
// Close closes the snapshot and removes any temporary storage associated with
// it. You must arrange to call this whenever NewSnapshot() has been called
// successfully. This is safe to call on a nil snapshot.
func (s *Snapshot) Close() error {
if s == nil {
return nil
}
if err := s.file.Close(); err != nil {
return err
}
return os.Remove(s.file.Name())
}
// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) error {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
return fmt.Errorf("failed to decompress snapshot: %v", err)
}
defer decomp.Close()
// Read the archive, throwing away the snapshot data.
var metadata raft.SnapshotMeta
if err := read(decomp, &metadata, ioutil.Discard); err != nil {
return fmt.Errorf("failed to read snapshot file: %v", err)
}
return nil
}
// Restore takes the snapshot from the reader and attempts to apply it to the
// given Raft instance.
func Restore(logger *log.Logger, in io.Reader, r *raft.Raft) error {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
return fmt.Errorf("failed to decompress snapshot: %v", err)
}
defer func() {
if err := decomp.Close(); err != nil {
logger.Printf("[ERR] snapshot: Failed to close snapshot decompressor: %v", err)
}
}()
// Make a scratch file to receive the contents of the snapshot data so
// we can avoid buffering in memory.
snap, err := ioutil.TempFile("", "snapshot")
if err != nil {
return fmt.Errorf("failed to create temp snapshot file: %v", err)
}
defer func() {
if err := snap.Close(); err != nil {
logger.Printf("[ERR] snapshot: Failed to close temp snapshot: %v", err)
}
if err := os.Remove(snap.Name()); err != nil {
logger.Printf("[ERR] snapshot: Failed to clean up temp snapshot: %v", err)
}
}()
// Read the archive.
var metadata raft.SnapshotMeta
if err := read(decomp, &metadata, snap); err != nil {
return fmt.Errorf("failed to read snapshot file: %v", err)
}
// Sync and rewind the file so it's ready to be read again.
if err := snap.Sync(); err != nil {
return fmt.Errorf("failed to sync temp snapshot: %v", err)
}
if _, err := snap.Seek(0, 0); err != nil {
return fmt.Errorf("failed to rewind temp snapshot: %v", err)
}
// Feed the snapshot into Raft.
if err := r.Restore(&metadata, snap, 60*time.Second); err != nil {
return fmt.Errorf("Raft error when restoring snapshot: %v", err)
}
return nil
}

View File

@ -0,0 +1,297 @@
package snapshot
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
)
// MockFSM is a simple FSM for testing that simply stores its logs in a slice of
// byte slices.
type MockFSM struct {
sync.Mutex
logs [][]byte
}
// MockSnapshot is a snapshot sink for testing that encodes the contents of a
// MockFSM using msgpack.
type MockSnapshot struct {
logs [][]byte
maxIndex int
}
// See raft.FSM.
func (m *MockFSM) Apply(log *raft.Log) interface{} {
m.Lock()
defer m.Unlock()
m.logs = append(m.logs, log.Data)
return len(m.logs)
}
// See raft.FSM.
func (m *MockFSM) Snapshot() (raft.FSMSnapshot, error) {
m.Lock()
defer m.Unlock()
return &MockSnapshot{m.logs, len(m.logs)}, nil
}
// See raft.FSM.
func (m *MockFSM) Restore(in io.ReadCloser) error {
m.Lock()
defer m.Unlock()
defer in.Close()
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(in, &hd)
m.logs = nil
return dec.Decode(&m.logs)
}
// See raft.SnapshotSink.
func (m *MockSnapshot) Persist(sink raft.SnapshotSink) error {
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(sink, &hd)
if err := enc.Encode(m.logs[:m.maxIndex]); err != nil {
sink.Cancel()
return err
}
sink.Close()
return nil
}
// See raft.SnapshotSink.
func (m *MockSnapshot) Release() {
}
// makeRaft returns a Raft and its FSM, with snapshots based in the given dir.
func makeRaft(t *testing.T, dir string) (*raft.Raft, *MockFSM) {
snaps, err := raft.NewFileSnapshotStore(dir, 5, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm := &MockFSM{}
store := raft.NewInmemStore()
addr, trans := raft.NewInmemTransport("")
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(fmt.Sprintf("server-%s", addr))
var members raft.Configuration
members.Servers = append(members.Servers, raft.Server{
Suffrage: raft.Voter,
ID: config.LocalID,
Address: addr,
})
err = raft.BootstrapCluster(config, store, store, snaps, trans, members)
if err != nil {
t.Fatalf("err: %v", err)
}
raft, err := raft.NewRaft(config, fsm, store, store, snaps, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
timeout := time.After(10 * time.Second)
for {
if raft.Leader() != "" {
break
}
select {
case <-raft.LeaderCh():
case <-time.After(1 * time.Second):
// Need to poll because we might have missed the first
// go with the leader channel.
case <-timeout:
t.Fatalf("timed out waiting for leader")
}
}
return raft, fsm
}
func TestSnapshot(t *testing.T) {
dir, err := ioutil.TempDir("", "snapshot")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)
// Make a Raft and populate it with some data. We tee everything we
// apply off to a buffer for checking post-snapshot.
var expected []bytes.Buffer
before, _ := makeRaft(t, path.Join(dir, "before"))
defer before.Shutdown()
for i := 0; i < 64*1024; i++ {
var log bytes.Buffer
var copy bytes.Buffer
both := io.MultiWriter(&log, &copy)
if _, err := io.CopyN(both, rand.Reader, 256); err != nil {
t.Fatalf("err: %v", err)
}
future := before.Apply(log.Bytes(), time.Second)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
expected = append(expected, copy)
}
// Take a snapshot.
logger := log.New(os.Stdout, "", 0)
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// Verify the snapshot. We have to rewind it after for the restore.
if err := Verify(snap); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := snap.file.Seek(0, 0); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new, independent Raft.
after, fsm := makeRaft(t, path.Join(dir, "after"))
defer after.Shutdown()
// Put some initial data in there that the snapshot should overwrite.
for i := 0; i < 16; i++ {
var log bytes.Buffer
if _, err := io.CopyN(&log, rand.Reader, 256); err != nil {
t.Fatalf("err: %v", err)
}
future := after.Apply(log.Bytes(), time.Second)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Restore the snapshot.
if err := Restore(logger, snap, after); err != nil {
t.Fatalf("err: %v", err)
}
// Compare the contents.
fsm.Lock()
defer fsm.Unlock()
if len(fsm.logs) != len(expected) {
t.Fatalf("bad: %d vs. %d", len(fsm.logs), len(expected))
}
for i, _ := range fsm.logs {
if !bytes.Equal(fsm.logs[i], expected[i].Bytes()) {
t.Fatalf("bad: log %d doesn't match", i)
}
}
}
func TestSnapshot_Nil(t *testing.T) {
var snap *Snapshot
if idx := snap.Index(); idx != 0 {
t.Fatalf("bad: %d", idx)
}
n, err := snap.Read(make([]byte, 16))
if n != 0 || err != io.EOF {
t.Fatalf("bad: %d %v", n, err)
}
if err := snap.Close(); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestSnapshot_BadVerify(t *testing.T) {
buf := bytes.NewBuffer([]byte("nope"))
err := Verify(buf)
if err == nil || !strings.Contains(err.Error(), "unexpected EOF") {
t.Fatalf("err: %v", err)
}
}
func TestSnapshot_BadRestore(t *testing.T) {
dir, err := ioutil.TempDir("", "snapshot")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir)
// Make a Raft and populate it with some data.
before, _ := makeRaft(t, path.Join(dir, "before"))
defer before.Shutdown()
for i := 0; i < 16*1024; i++ {
var log bytes.Buffer
if _, err := io.CopyN(&log, rand.Reader, 256); err != nil {
t.Fatalf("err: %v", err)
}
future := before.Apply(log.Bytes(), time.Second)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Take a snapshot.
logger := log.New(os.Stdout, "", 0)
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make a new, independent Raft.
after, fsm := makeRaft(t, path.Join(dir, "after"))
defer after.Shutdown()
// Put some initial data in there that should not be harmed by the
// failed restore attempt.
var expected []bytes.Buffer
for i := 0; i < 16; i++ {
var log bytes.Buffer
var copy bytes.Buffer
both := io.MultiWriter(&log, &copy)
if _, err := io.CopyN(both, rand.Reader, 256); err != nil {
t.Fatalf("err: %v", err)
}
future := after.Apply(log.Bytes(), time.Second)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
expected = append(expected, copy)
}
// Attempt to restore a truncated version of the snapshot. This is
// expected to fail.
err = Restore(logger, io.LimitReader(snap, 512), after)
if err == nil || !strings.Contains(err.Error(), "unexpected EOF") {
t.Fatalf("err: %v", err)
}
// Compare the contents to make sure the aborted restore didn't harm
// anything.
fsm.Lock()
defer fsm.Unlock()
if len(fsm.logs) != len(expected) {
t.Fatalf("bad: %d vs. %d", len(fsm.logs), len(expected))
}
for i, _ := range fsm.logs {
if !bytes.Equal(fsm.logs[i], expected[i].Bytes()) {
t.Fatalf("bad: log %d doesn't match", i)
}
}
}

224
consul/snapshot_endpoint.go Normal file
View File

@ -0,0 +1,224 @@
// The snapshot endpoint is a special non-RPC endpoint that supports streaming
// for taking and restoring snapshots for disaster recovery. This gets wired
// directly into Consul's stream handler, and a new TCP connection is made for
// each request.
//
// This also includes a SnapshotRPC() function, which acts as a lightweight
// client that knows the details of the stream protocol.
package consul
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"github.com/hashicorp/consul/consul/snapshot"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
)
const (
// noSnapshotsInDevMode indicates that snapshots aren't available for a
// server in dev mode.
noSnapshotsInDevMode = "Snapshot operations not available in dev mode"
)
// dispatchSnapshotRequest takes an incoming request structure with possibly some
// streaming data (for a restore) and returns possibly some streaming data (for
// a snapshot save). We can't use the normal RPC mechanism in a streaming manner
// like this, so we have to dispatch these by hand.
func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Reader,
reply *structs.SnapshotResponse) (io.ReadCloser, error) {
// Perform DC forwarding.
if dc := args.Datacenter; dc != s.config.Datacenter {
server, ok := s.getRemoteServer(dc)
if !ok {
return nil, structs.ErrNoDCPath
}
return SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply)
}
// Perform leader forwarding if required.
if !args.AllowStale {
if isLeader, server := s.getLeader(); !isLeader {
if server == nil {
return nil, structs.ErrNoLeader
}
return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply)
}
}
// Snapshots don't work in dev mode because we need Raft's snapshots to
// be readable. Better to present a clear error than one from deep down
// in the Raft snapshot store.
if s.config.DevMode {
return nil, errors.New(noSnapshotsInDevMode)
}
// Verify token is allowed to operate on snapshots. There's only a
// single ACL sense here (not read and write) since reading gets you
// all the ACLs and you could escalate from there.
if acl, err := s.resolveToken(args.Token); err != nil {
return nil, err
} else if acl != nil && !acl.Snapshot() {
return nil, permissionDeniedErr
}
// Dispatch the operation.
switch args.Op {
case structs.SnapshotSave:
if !args.AllowStale {
if err := s.consistentRead(); err != nil {
return nil, err
}
}
// Set the metadata here before we do anything; this should always be
// pessimistic if we get more data while the snapshot is being taken.
s.setQueryMeta(&reply.QueryMeta)
// Take the snapshot and capture the index.
snap, err := snapshot.New(s.logger, s.raft)
reply.Index = snap.Index()
return snap, err
case structs.SnapshotRestore:
if args.AllowStale {
return nil, fmt.Errorf("stale not allowed for restore")
}
// Restore the snapshot.
if err := snapshot.Restore(s.logger, in, s.raft); err != nil {
return nil, err
}
// Run a barrier so we are sure that our FSM is caught up with
// any snapshot restore details (it's also part of Raft's restore
// process but we don't want to depend on that detail for this to
// be correct). Once that works, we can redo the leader actions
// so our leader-maintained state will be up to date.
barrier := s.raft.Barrier(0)
if err := barrier.Error(); err != nil {
return nil, err
}
if err := s.revokeLeadership(); err != nil {
return nil, err
}
if err := s.establishLeadership(); err != nil {
return nil, err
}
// Give the caller back an empty reader since there's nothing to
// stream back.
return ioutil.NopCloser(bytes.NewReader([]byte(""))), nil
default:
return nil, fmt.Errorf("unrecognized snapshot op %q", args.Op)
}
}
// handleSnapshotRequest reads the request from the conn and dispatches it. This
// will be called from a goroutine after an incoming stream is determined to be
// a snapshot request.
func (s *Server) handleSnapshotRequest(conn net.Conn) error {
var args structs.SnapshotRequest
dec := codec.NewDecoder(conn, &codec.MsgpackHandle{})
if err := dec.Decode(&args); err != nil {
return fmt.Errorf("failed to decode request: %v", err)
}
var reply structs.SnapshotResponse
snap, err := s.dispatchSnapshotRequest(&args, conn, &reply)
if err != nil {
reply.Error = err.Error()
goto RESPOND
}
defer func() {
if err := snap.Close(); err != nil {
s.logger.Printf("[ERR] consul: Failed to close snapshot: %v", err)
}
}()
RESPOND:
enc := codec.NewEncoder(conn, &codec.MsgpackHandle{})
if err := enc.Encode(&reply); err != nil {
return fmt.Errorf("failed to encode response: %v", err)
}
if snap != nil {
if _, err := io.Copy(conn, snap); err != nil {
return fmt.Errorf("failed to stream snapshot: %v", err)
}
}
return nil
}
// SnapshotRPC is a streaming client function for performing a snapshot RPC
// request to a remote server. It will create a fresh connection for each
// request, send the request header, and then stream in any data from the
// reader (for a restore). It will then parse the received response header, and
// if there's no error will return an io.ReadCloser (that you must close) with
// the streaming output (for a snapshot). If the reply contains an error, this
// will always return an error as well, so you don't need to check the error
// inside the filled-in reply.
func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr,
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
conn, hc, err := pool.Dial(dc, addr)
if err != nil {
return nil, err
}
// keep will disarm the defer on success if we are returning the caller
// our connection to stream the output.
var keep bool
defer func() {
if !keep {
conn.Close()
}
}()
// Write the snapshot RPC byte to set the mode, then perform the
// request.
if _, err := conn.Write([]byte{byte(rpcSnapshot)}); err != nil {
return nil, fmt.Errorf("failed to write stream type: %v", err)
}
// Push the header encoded as msgpack, then stream the input.
enc := codec.NewEncoder(conn, &codec.MsgpackHandle{})
if err := enc.Encode(&args); err != nil {
return nil, fmt.Errorf("failed to encode request: %v", err)
}
if _, err := io.Copy(conn, in); err != nil {
return nil, fmt.Errorf("failed to copy snapshot in: %v", err)
}
// Our RPC protocol requires support for a half-close in order to signal
// the other side that they are done reading the stream, since we don't
// know the size in advance. This saves us from having to buffer just to
// calculate the size.
if hc != nil {
if err := hc.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to half close snapshot connection: %v", err)
}
} else {
return nil, fmt.Errorf("snapshot connection requires half-close support")
}
// Pull the header decoded as msgpack. The caller can continue to read
// the conn to stream the remaining data.
dec := codec.NewDecoder(conn, &codec.MsgpackHandle{})
if err := dec.Decode(reply); err != nil {
return nil, fmt.Errorf("failed to decode response: %v", err)
}
if reply.Error != "" {
return nil, errors.New(reply.Error)
}
keep = true
return conn, nil
}

View File

@ -0,0 +1,416 @@
package consul
import (
"bytes"
"fmt"
"os"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
// verifySnapshot is a helper that does a snapshot and restore.
func verifySnapshot(t *testing.T, s *Server, dc, token string) {
codec := rpcClient(t, s)
defer codec.Close()
// Set a key to a before value.
{
args := structs.KVSRequest{
Datacenter: dc,
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Value: []byte("hello"),
},
WriteRequest: structs.WriteRequest{
Token: token,
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Take a snapshot.
args := structs.SnapshotRequest{
Datacenter: dc,
Token: token,
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// Read back the before value.
{
getR := structs.KeyRequest{
Datacenter: dc,
Key: "test",
QueryOptions: structs.QueryOptions{
Token: token,
},
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "hello" {
t.Fatalf("bad: %v", d)
}
}
// Set a key to an after value.
{
args := structs.KVSRequest{
Datacenter: dc,
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Value: []byte("goodbye"),
},
WriteRequest: structs.WriteRequest{
Token: token,
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Read back the before value.
{
getR := structs.KeyRequest{
Datacenter: dc,
Key: "test",
QueryOptions: structs.QueryOptions{
Token: token,
},
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "goodbye" {
t.Fatalf("bad: %v", d)
}
}
// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
defer restore.Close()
// Read back the before value post-snapshot.
{
getR := structs.KeyRequest{
Datacenter: dc,
Key: "test",
QueryOptions: structs.QueryOptions{
Token: token,
},
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "hello" {
t.Fatalf("bad: %v", d)
}
}
}
func TestSnapshot(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC, "dc1")
verifySnapshot(t, s1, "dc1", "")
}
func TestSnapshot_LeaderState(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Make a before session.
var before string
{
args := structs.SessionRequest{
Datacenter: s1.config.Datacenter,
Op: structs.SessionCreate,
Session: structs.Session{
Node: s1.config.NodeName,
TTL: "60s",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &before); err != nil {
t.Fatalf("err: %v", err)
}
}
// Take a snapshot.
args := structs.SnapshotRequest{
Datacenter: s1.config.Datacenter,
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Close()
// Make an after session.
var after string
{
args := structs.SessionRequest{
Datacenter: s1.config.Datacenter,
Op: structs.SessionCreate,
Session: structs.Session{
Node: s1.config.NodeName,
TTL: "60s",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &after); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure the leader has timers setup.
if _, ok := s1.sessionTimers[before]; !ok {
t.Fatalf("missing session timer")
}
if _, ok := s1.sessionTimers[after]; !ok {
t.Fatalf("missing session timer")
}
// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
defer restore.Close()
// Make sure the before time is still there, and that the after timer
// got reverted. This proves we fully cycled the leader state.
if _, ok := s1.sessionTimers[before]; !ok {
t.Fatalf("missing session timer")
}
if _, ok := s1.sessionTimers[after]; ok {
t.Fatalf("unexpected session timer")
}
}
func TestSnapshot_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Take a snapshot.
func() {
args := structs.SnapshotRequest{
Datacenter: "dc1",
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
}()
// Restore a snapshot.
func() {
args := structs.SnapshotRequest{
Datacenter: "dc1",
Op: structs.SnapshotRestore,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
}()
// With the token in place everything should go through.
verifySnapshot(t, s1, "dc1", "root")
}
func TestSnapshot_Forward_Leader(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Run against the leader and the follower to ensure we forward.
for _, s := range []*Server{s1, s2} {
verifySnapshot(t, s, "dc1", "")
verifySnapshot(t, s, "dc1", "")
}
}
func TestSnapshot_Forward_Datacenter(t *testing.T) {
dir1, s1 := testServerDC(t, "dc1")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc2")
// Try to WAN join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(
func() (bool, error) {
return len(s1.WANMembers()) > 1, nil
},
func(err error) {
t.Fatalf("Failed waiting for WAN join: %v", err)
})
// Run a snapshot from each server locally and remotely to ensure we
// forward.
for _, s := range []*Server{s1, s2} {
verifySnapshot(t, s, "dc1", "")
verifySnapshot(t, s, "dc2", "")
}
}
func TestSnapshot_AllowStale(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Run against the servers which aren't haven't been set up to establish
// a leader and make sure we get a no leader error.
for _, s := range []*Server{s1, s2} {
// Take a snapshot.
args := structs.SnapshotRequest{
Datacenter: s.config.Datacenter,
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
t.Fatalf("err: %v", err)
}
}
// Run in stale mode and make sure we get an error from Raft (snapshot
// was attempted), and not a no leader error.
for _, s := range []*Server{s1, s2} {
// Take a snapshot.
args := structs.SnapshotRequest{
Datacenter: s.config.Datacenter,
AllowStale: true,
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
t.Fatalf("err: %v", err)
}
}
}
func TestSnapshot_DevMode(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
args := structs.SnapshotRequest{
Datacenter: s1.config.Datacenter,
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), noSnapshotsInDevMode) {
t.Fatalf("err: %v", err)
}
}

View File

@ -0,0 +1,40 @@
package structs
type SnapshotOp int
const (
SnapshotSave SnapshotOp = iota
SnapshotRestore
)
// SnapshotRequest is used as a header for a snapshot RPC request. This will
// precede any streaming data that's part of the request and is JSON-encoded on
// the wire.
type SnapshotRequest struct {
// Datacenter is the target datacenter for this request. The request
// will be forwarded if necessary.
Datacenter string
// Token is the ACL token to use for the operation. If ACLs are enabled
// then all operations require a management token.
Token string
// If set, any follower can service the request. Results may be
// arbitrarily stale. Only applies to SnapshotSave.
AllowStale bool
// Op is the operation code for the RPC.
Op SnapshotOp
}
// SnapshotResponse is used header for a snapshot RPC response. This will
// precede any streaming data that's part of the request and is JSON-encoded on
// the wire.
type SnapshotResponse struct {
// Error is the overall error status of the RPC request.
Error string
// QueryMeta has freshness information about the server that handled the
// request. It is only filled in for a SnapshotSave.
QueryMeta
}

BIN
test/snapshot/corrupt-meta.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/corrupt-sha.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/corrupt-state.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/empty.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/extra.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/missing-meta.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/missing-sha.tar (Stored with Git LFS) Normal file

Binary file not shown.

BIN
test/snapshot/missing-state.tar (Stored with Git LFS) Normal file

Binary file not shown.

View File

@ -1,10 +1,10 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
test:
go test -timeout=45s ./...
go test -timeout=60s ./...
integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...
INTEG_TESTS=yes go test -timeout=5s -run=Integ ./...
deps:
go get -d -v ./...

View File

@ -3,6 +3,7 @@ package raft
import (
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
@ -25,6 +26,10 @@ var (
// because it's been deposed in the process.
ErrLeadershipLost = errors.New("leadership lost while committing log")
// ErrAbortedByRestore is returned when a leader fails to commit a log
// entry because it's been superseded by a user snapshot restore.
ErrAbortedByRestore = errors.New("snapshot restored while committing log")
// ErrRaftShutdown is returned when operations are requested against an
// inactive Raft.
ErrRaftShutdown = errors.New("raft is already shutdown")
@ -64,11 +69,14 @@ type Raft struct {
// FSM is the client state machine to apply commands to
fsm FSM
// fsmCommitCh is used to trigger async application of logs to the fsm
fsmCommitCh chan commitTuple
// fsmRestoreCh is used to trigger a restore from snapshot
fsmRestoreCh chan *restoreFuture
// fsmMutateCh is used to send state-changing updates to the FSM. This
// receives pointers to commitTuple structures when applying logs or
// pointers to restoreFuture structures when restoring a snapshot. We
// need control over the order of these operations when doing user
// restores so that we finish applying any old log applies before we
// take a user snapshot on the leader, otherwise we might restore the
// snapshot and apply old logs to it that were in the pipe.
fsmMutateCh chan interface{}
// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture
@ -118,8 +126,12 @@ type Raft struct {
// snapshots is used to store and retrieve snapshots
snapshots SnapshotStore
// snapshotCh is used for user triggered snapshots
snapshotCh chan *snapshotFuture
// userSnapshotCh is used for user-triggered snapshots
userSnapshotCh chan *userSnapshotFuture
// userRestoreCh is used for user-triggered restores of external
// snapshots
userRestoreCh chan *userRestoreFuture
// stable is a StableStore implementation for durable state
// It provides stable storage for many fields in raftState
@ -429,8 +441,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
@ -441,7 +452,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
snapshotCh: make(chan *snapshotFuture),
userSnapshotCh: make(chan *userSnapshotFuture),
userRestoreCh: make(chan *userRestoreFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
@ -792,18 +804,78 @@ func (r *Raft) Shutdown() Future {
return &shutdownFuture{nil}
}
// Snapshot is used to manually force Raft to take a snapshot.
// Returns a future that can be used to block until complete.
func (r *Raft) Snapshot() Future {
snapFuture := &snapshotFuture{}
snapFuture.init()
// Snapshot is used to manually force Raft to take a snapshot. Returns a future
// that can be used to block until complete, and that contains a function that
// can be used to open the snapshot.
func (r *Raft) Snapshot() SnapshotFuture {
future := &userSnapshotFuture{}
future.init()
select {
case r.snapshotCh <- snapFuture:
return snapFuture
case r.userSnapshotCh <- future:
return future
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
future.respond(ErrRaftShutdown)
return future
}
}
// Restore is used to manually force Raft to consume an external snapshot, such
// as if restoring from a backup. We will use the current Raft configuration,
// not the one from the snapshot, so that we can restore into a new cluster. We
// will also use the higher of the index of the snapshot, or the current index,
// and then add 1 to that, so we force a new state with a hole in the Raft log,
// so that the snapshot will be sent to followers and used for any new joiners.
// This can only be run on the leader, and blocks until the restore is complete
// or an error occurs.
//
// WARNING! This operation has the leader take on the state of the snapshot and
// then sets itself up so that it replicates that to its followers though the
// install snapshot process. This involves a potentially dangerous period where
// the leader commits ahead of its followers, so should only be used for disaster
// recovery into a fresh cluster, and should not be used in normal operations.
func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error {
metrics.IncrCounter([]string{"raft", "restore"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Perform the restore.
restore := &userRestoreFuture{
meta: meta,
reader: reader,
}
restore.init()
select {
case <-timer:
return ErrEnqueueTimeout
case <-r.shutdownCh:
return ErrRaftShutdown
case r.userRestoreCh <- restore:
// If the restore is ingested then wait for it to complete.
if err := restore.Error(); err != nil {
return err
}
}
// Apply a no-op log entry. Waiting for this allows us to wait until the
// followers have gotten the restore and replicated at least this new
// entry, which shows that we've also faulted and installed the
// snapshot with the contents of the restore.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
noop.init()
select {
case <-timer:
return ErrEnqueueTimeout
case <-r.shutdownCh:
return ErrRaftShutdown
case r.applyCh <- noop:
return noop.Error()
}
}
// State is used to return the current raft state.
@ -870,7 +942,7 @@ func (r *Raft) Stats() map[string]string {
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"fsm_pending": toString(uint64(len(r.fsmMutateCh))),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"protocol_version": toString(uint64(r.protocolVersion)),

View File

@ -48,67 +48,87 @@ type FSMSnapshot interface {
// the FSM to block our internal operations.
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
commit := func(req *commitTuple) {
// Apply the log if a command
var resp interface{}
if req.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term
// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
restore := func(req *restoreFuture) {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}
// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
return
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
}
snapshot := func(req *reqSnapshotFuture) {
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
return
}
// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)
// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
}
for {
select {
case req := <-r.fsmRestoreCh:
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
continue
}
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)
// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
continue
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
case *restoreFuture:
restore(req)
// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}
case req := <-r.fsmSnapshotCh:
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
continue
}
snapshot(req)
// Start a snapshot
start := time.Now()
snap, err := r.fsm.Snapshot()
metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start)
// Respond to the request
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
case commitEntry := <-r.fsmCommitCh:
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Invoke the future if given
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
}
case <-r.shutdownCh:
return
}

View File

@ -1,6 +1,8 @@
package raft
import (
"fmt"
"io"
"sync"
"time"
)
@ -46,6 +48,16 @@ type ConfigurationFuture interface {
Configuration() Configuration
}
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
type SnapshotFuture interface {
Future
// Open is a function you can call to access the underlying snapshot and
// its metadata. This must not be called until after the Error method
// has returned.
Open() (*SnapshotMeta, io.ReadCloser, error)
}
// errorFuture is used to return a static error.
type errorFuture struct {
err error
@ -150,9 +162,41 @@ func (s *shutdownFuture) Error() error {
return nil
}
// snapshotFuture is used for waiting on a snapshot to complete.
type snapshotFuture struct {
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
// complete.
type userSnapshotFuture struct {
deferError
// opener is a function used to open the snapshot. This is filled in
// once the future returns with no error.
opener func() (*SnapshotMeta, io.ReadCloser, error)
}
// Open is a function you can call to access the underlying snapshot and its
// metadata.
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available")
} else {
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
}
// userRestoreFuture is used for waiting on a user-triggered restore of an
// external snapshot to complete.
type userRestoreFuture struct {
deferError
// meta is the metadata that belongs with the snapshot.
meta *SnapshotMeta
// reader is the interface to read the snapshot contents from.
reader io.Reader
}
// reqSnapshotFuture is used for requesting a snapshot start.

View File

@ -160,6 +160,10 @@ func (r *Raft) runFollower() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)
case r := <-r.userRestoreCh:
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
@ -283,6 +287,10 @@ func (r *Raft) runCandidate() {
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)
case r := <-r.userRestoreCh:
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
@ -550,6 +558,10 @@ func (r *Raft) leaderLoop() {
v.respond(nil)
}
case future := <-r.userRestoreCh:
err := r.restoreUserSnapshot(future.meta, future.reader)
future.respond(err)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
@ -680,6 +692,102 @@ func (r *Raft) quorumSize() int {
return voters/2 + 1
}
// restoreUserSnapshot is used to manually consume an external snapshot, such
// as if restoring from a backup. We will use the current Raft configuration,
// not the one from the snapshot, so that we can restore into a new cluster. We
// will also use the higher of the index of the snapshot, or the current index,
// and then add 1 to that, so we force a new state with a hole in the Raft log,
// so that the snapshot will be sent to followers and used for any new joiners.
// This can only be run on the leader, and returns a future that can be used to
// block until complete.
func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now())
// Sanity check the version.
version := meta.Version
if version < SnapshotVersionMin || version > SnapshotVersionMax {
return fmt.Errorf("unsupported snapshot version %d", version)
}
// We don't support snapshots while there's a config change
// outstanding since the snapshot doesn't have a means to
// represent this state.
committedIndex := r.configurations.committedIndex
latestIndex := r.configurations.latestIndex
if committedIndex != latestIndex {
return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
latestIndex, committedIndex)
}
// Cancel any inflight requests.
for {
e := r.leaderState.inflight.Front()
if e == nil {
break
}
e.Value.(*logFuture).respond(ErrAbortedByRestore)
r.leaderState.inflight.Remove(e)
}
// We will overwrite the snapshot metadata with the current term,
// an index that's greater than the current index, or the last
// index in the snapshot. It's important that we leave a hole in
// the index so we know there's nothing in the Raft log there and
// replication will fault and send the snapshot.
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
if meta.Index > lastIndex {
lastIndex = meta.Index
}
lastIndex++
// Dump the snapshot. Note that we use the latest configuration,
// not the one that came with the snapshot.
sink, err := r.snapshots.Create(version, lastIndex, term,
r.configurations.latest, r.configurations.latestIndex, r.trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
n, err := io.Copy(sink, reader)
if err != nil {
sink.Cancel()
return fmt.Errorf("failed to write snapshot: %v", err)
}
if n != meta.Size {
sink.Cancel()
return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size)
}
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err)
}
r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n)
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
fsm := &restoreFuture{ID: sink.ID()}
fsm.init()
select {
case r.fsmMutateCh <- fsm:
case <-r.shutdownCh:
return ErrRaftShutdown
}
if err := fsm.Error(); err != nil {
panic(fmt.Errorf("failed to restore snapshot: %v", err))
}
// We set the last log so it looks like we've stored the empty
// index we burned. The last applied is set because we made the
// FSM take the snapshot state, and we store the last snapshot
// in the stable store since we created a snapshot as part of
// this process.
r.setLastLog(lastIndex, term)
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)
r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex)
return nil
}
// appendConfigurationEntry changes the configuration and adds a new
// configuration entry to the log. This must only be called from the
// main thread.
@ -804,7 +912,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
case LogCommand:
// Forward to the fsm handler
select {
case r.fsmCommitCh <- commitTuple{l, future}:
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
@ -1204,7 +1312,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
future := &restoreFuture{ID: sink.ID()}
future.init()
select {
case r.fsmRestoreCh <- future:
case r.fsmMutateCh <- future:
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
return

View File

@ -76,15 +76,19 @@ func (r *Raft) runSnapshots() {
}
// Trigger a snapshot
if err := r.takeSnapshot(); err != nil {
if _, err := r.takeSnapshot(); err != nil {
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
}
case future := <-r.snapshotCh:
case future := <-r.userSnapshotCh:
// User-triggered, run immediately
err := r.takeSnapshot()
id, err := r.takeSnapshot()
if err != nil {
r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err)
} else {
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
return r.snapshots.Open(id)
}
}
future.respond(err)
@ -113,8 +117,9 @@ func (r *Raft) shouldSnapshot() bool {
}
// takeSnapshot is used to take a new snapshot. This must only be called from
// the snapshot thread, never the main thread.
func (r *Raft) takeSnapshot() error {
// the snapshot thread, never the main thread. This returns the ID of the new
// snapshot, along with an error.
func (r *Raft) takeSnapshot() (string, error) {
defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now())
// Create a request for the FSM to perform a snapshot.
@ -125,7 +130,7 @@ func (r *Raft) takeSnapshot() error {
select {
case r.fsmSnapshotCh <- snapReq:
case <-r.shutdownCh:
return ErrRaftShutdown
return "", ErrRaftShutdown
}
// Wait until we get a response
@ -133,7 +138,7 @@ func (r *Raft) takeSnapshot() error {
if err != ErrNothingNewToSnapshot {
err = fmt.Errorf("failed to start snapshot: %v", err)
}
return err
return "", err
}
defer snapReq.snapshot.Release()
@ -145,10 +150,10 @@ func (r *Raft) takeSnapshot() error {
select {
case r.configurationsCh <- configReq:
case <-r.shutdownCh:
return ErrRaftShutdown
return "", ErrRaftShutdown
}
if err := configReq.Error(); err != nil {
return err
return "", err
}
committed := configReq.configurations.committed
committedIndex := configReq.configurations.committedIndex
@ -162,7 +167,7 @@ func (r *Raft) takeSnapshot() error {
// then it's not crucial that we snapshot, since there's not much going
// on Raft-wise.
if snapReq.index < committedIndex {
return fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
committedIndex, snapReq.index)
}
@ -172,7 +177,7 @@ func (r *Raft) takeSnapshot() error {
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
return "", fmt.Errorf("failed to create snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start)
@ -180,13 +185,13 @@ func (r *Raft) takeSnapshot() error {
start = time.Now()
if err := snapReq.snapshot.Persist(sink); err != nil {
sink.Cancel()
return fmt.Errorf("failed to persist snapshot: %v", err)
return "", fmt.Errorf("failed to persist snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start)
// Close and check for error.
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err)
return "", fmt.Errorf("failed to close snapshot: %v", err)
}
// Update the last stable snapshot info.
@ -194,11 +199,11 @@ func (r *Raft) takeSnapshot() error {
// Compact the logs.
if err := r.compactLogs(snapReq.index); err != nil {
return err
return "", err
}
r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index)
return nil
return sink.ID(), nil
}
// compactLogs takes the last inclusive index of a snapshot

6
vendor/vendor.json vendored
View File

@ -350,10 +350,10 @@
"revisionTime": "2015-11-16T02:03:38Z"
},
{
"checksumSHA1": "zMgiTV0dfJIQNRCJDF50bLomDvg=",
"checksumSHA1": "tHzyGCXkf8PnmBrTk1Z01JIv/5Q=",
"path": "github.com/hashicorp/raft",
"revision": "c69c15dd73b6695ba75b3502ce6b332cc0042c83",
"revisionTime": "2016-08-01T21:27:18Z"
"revision": "e1d3debe52b9152e8db5c3a77b7f7cf9b2a8b404",
"revisionTime": "2016-10-26T00:17:15Z"
},
{
"checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=",

View File

@ -24,6 +24,7 @@ Each endpoint manages a different aspect of Consul:
* [operator](http/operator.html) - Consul Operator Tools
* [query](http/query.html) - Prepared Queries
* [session](http/session.html) - Sessions
* [snapshots](http/snapshot.html) - Consul Snapshots for Disaster Recovery
* [status](http/status.html) - Consul System Status
Each of these is documented in detail at the links above. Consul also has a number

View File

@ -0,0 +1,97 @@
---
layout: "docs"
page_title: "Consul Snapshots (HTTP)"
sidebar_current: "docs-agent-http-snapshots"
description: >
The Snapshot endpoints are used to save and restore Consul's server state for disaster recovery.
---
# Snapshot HTTP Endpoint
The Snapshot endpoints are used to save and restore the state of the Consul
servers for disaster recovery. Snapshots include all state managed by Consul's
Raft [consensus protocol](/docs/internals/consensus.html), including:
* Key/Value Entries
* Service Catalog
* Prepared Queries
* Sessions
* ACLs
Available in Consul 0.7.1 and later, these endpoints allow for atomic,
point-in-time snapshots of the above data in a format that can be saved
externally. Snapshots can then be used to restore the server state into a fresh
cluster of Consul servers in the event of a disaster.
The following endpoints are supported:
* [`/v1/snapshot`](#snapshot): Save and restore Consul server state
These endpoints do not support blocking queries. Saving snapshots uses the
consistent mode by default and stale mode is supported.
The endpoints support the use of ACL Tokens. Because snapshots contain all
server state, including ACLs, a management token is required to perform snapshot
operations is ACLs are enabled.
-> Snapshot operations are not available for servers running in
[dev mode](/docs/agent/options.html#_dev).
### <a name="snapshot"></a> /v1/snapshot
The snapshot endpoint supports the `GET` and `PUT` methods.
#### GET Method
When using the `GET` method, Consul will perform an atomic, point-in-time
snapshot of the Consul server state.
Snapshots are exposed as gzipped tar archives which internally contain the Raft
metadata required to restore, as well as a binary serialized version of the Consul
server state. The contents are covered internally by SHA-256 hashes. These hashes
are verified during snapshot restore operations. The structure of the archive is
internal to Consul and not intended to be used other than for restore operations.
In particular, the archives are not designed to be modified before a restore.
By default, the datacenter of the agent is queried; however, the `dc` can be
provided using the "?dc=" query parameter.
By default, snapshots use consistent mode which means the request is internally
forwarded to the cluster leader, and leadership is checked before performing the
snapshot. If `stale` is specified using the "?stale" query parameter, then any
server can handle the request and the results may be arbitrarily stale. To support
bounding the acceptable staleness of snapshots, responses provide the `X-Consul-LastContact`
header containing the time in milliseconds that a server was last contacted by
the leader node. The `X-Consul-KnownLeader` header also indicates if there is a
known leader. These can be used by clients to gauge the staleness of a snapshot
and take appropriate action. The stale mode is particularly useful from taking a
snapshot of a cluster in a failed state with no current leader.
If ACLs are enabled, the client will need to supply an ACL Token with management
privileges.
The return code is 200 on success, and the snapshot will be returned in the body
as a gzipped tar archive. In addition to the stale-related headers described above,
the `X-Consul-Index` header will also be set to the index at which the snapshot took
place.
#### PUT Method
When using the `PUT` method, Consul will atomically restore a point-in-time
snapshot of the Consul server state.
Restores involve a potentially dangerous low-level Raft operation that is not
designed to handle server failures during a restore. This operation is primarily
intended to be used when recovering from a disaster, restoring into a fresh
cluster of Consul servers.
By default, the datacenter of the agent is targeted; however, the `dc` can be
provided using the "?dc=" query parameter.
If ACLs are enabled, the client will need to supply an ACL Token with management
privileges.
The body of the request should be a snapshot archive returned from a previous call
to the `GET` method.
The return code is 200 on success.

View File

@ -14,7 +14,7 @@ and deleting from the store. This command is available in Consul 0.7.1 and
later.
The key-value store is also accessible via the
[HTTP API](docs/agent/http/kv.html).
[HTTP API](/docs/agent/http/kv.html).
## Usage

View File

@ -0,0 +1,59 @@
---
layout: "docs"
page_title: "Commands: Snapshot"
sidebar_current: "docs-commands-snapshot"
---
# Consul Snapshot
Command: `consul snapshot`
The `snapshot` command has subcommands for saving and restoring the state of the
Consul servers for disaster recovery. These are atomic, point-in-time snapshots
which include key/value entries, service catalog, prepared queries, sessions, and
ACLs. This command is available in Consul 0.7.1 and later.
Snapshots are also accessible via the [HTTP API](/docs/agent/http/snapshot.html).
## Usage
Usage: `consul snapshot <subcommand>`
For the exact documentation for your Consul version, run `consul snapshot -h` to
view the complete list of subcommands.
```text
Usage: consul snapshot <subcommand> [options] [args]
# ...
Subcommands:
restore Restores snapshot of Consul server state
save Saves snapshot of Consul server state
```
For more information, examples, and usage about a subcommand, click on the name
of the subcommand in the sidebar or one of the links below:
- [restore](/docs/commands/snapshot/restore.html)
- [save](/docs/commands/snapshot/save.html)
## Basic Examples
To create a snapshot and save it as a file called "backup.snap":
```text
$ consul snapshot save backup.snap
Saved and verified snapshot to index 8419
```
To restore a snapshot from a file called "backup.snap":
```text
$ consul snapshot restore backup.snap
Restored snapshot
```
For more examples, ask for subcommand help or view the subcommand documentation
by clicking on one of the links in the sidebar.

View File

@ -0,0 +1,42 @@
---
layout: "docs"
page_title: "Commands: Snapshot Restore"
sidebar_current: "docs-commands-snapshot-restore"
---
# Consul Snapshot Restore
Command: `consul snapshot restore`
The `snapshot restore` command is used to restore an atomic, point-in-time
snapshot of the state of the Consul servers which includes key/value entries,
service catalog, prepared queries, sessions, and ACLs. The snapshot is read
from the given file.
Restores involve a potentially dangerous low-level Raft operation that is not
designed to handle server failures during a restore. This command is primarily
intended to be used when recovering from a disaster, restoring into a fresh
cluster of Consul servers.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot a snapshot save.
## Usage
Usage: `consul snapshot restore [options] FILE`
#### API Options
<%= partial "docs/commands/http_api_options" %>
## Examples
To restore a snapshot from the file "backup.snap":
```text
$ consul snapshot restore backup.snap
Restored snapshot
```
Please see the [HTTP API](/docs/agent/http/snapshot.html) documentation for
more details about snapshot internals.

View File

@ -0,0 +1,56 @@
---
layout: "docs"
page_title: "Commands: Snapshot Save"
sidebar_current: "docs-commands-snapshot-save"
---
# Consul Snapshot Save
Command: `consul snapshot save`
The `snapshot save` command is used to retrieve an atomic, point-in-time snapshot
of the state of the Consul servers which includes key/value entries,
service catalog, prepared queries, sessions, and ACLs. The snapshot is saved to
the given file.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot a snapshot save.
## Usage
Usage: `consul snapshot save [options] FILE`
#### API Options
<%= partial "docs/commands/http_api_options" %>
## Examples
To create a snapshot from the leader server and save it to "backup.snap":
```text
$ consul snapshot save backup.snap
Saved and verified snapshot to index 8419
```
By default, snapshots are taken using a consistent mode that forwards requests
to the leader and the leader verifies it is still in power before taking the
snapshot.
After the snapshot is written to the given file it is read back and verified for
integrity.
To create a potentially stale snapshot from any available server, use the stale
consisentcy mode:
```text
$ consul snapshot save -stale backup.snap
# ...
```
This is useful for situations where a cluster is in a degraded state and no
leader is available. To target a specific server for a snapshot, you can run
the `consul snapshot save` command on that specific server.
Please see the [HTTP API](/docs/agent/http/snapshot.html) documentation for
more details about snapshot internals.

View File

@ -149,6 +149,18 @@
<a href="/docs/commands/rtt.html">rtt</a>
</li>
<li<%= sidebar_current("docs-commands-snapshot") %>>
<a href="/docs/commands/snapshot.html">snapshot</a>
<ul class="subnav">
<li<%= sidebar_current("docs-commands-snapshot-restore") %>>
<a href="/docs/commands/snapshot/restore.html">restore</a>
</li>
<li<%= sidebar_current("docs-commands-snapshot-save") %>>
<a href="/docs/commands/snapshot/save.html">save</a>
</li>
</ul>
</li>
<li<%= sidebar_current("docs-commands-watch") %>>
<a href="/docs/commands/watch.html">watch</a>
</li>
@ -198,7 +210,7 @@
</li>
<li<%= sidebar_current("docs-agent-http-operator") %>>
<a href="/docs/agent/http/operator.html">Operator </a>
<a href="/docs/agent/http/operator.html">Operator</a>
</li>
<li<%= sidebar_current("docs-agent-http-query") %>>
@ -209,6 +221,10 @@
<a href="/docs/agent/http/session.html">Sessions</a>
</li>
<li<%= sidebar_current("docs-agent-http-snapshot") %>>
<a href="/docs/agent/http/snapshot.html">Snapshots</a>
</li>
<li<%= sidebar_current("docs-agent-http-status") %>>
<a href="/docs/agent/http/status.html">Status</a>
</li>