From bc29610124a2cbf32ec5e75815c78dfe9a618cf5 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 25 Oct 2016 19:20:24 -0700 Subject: [PATCH] Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore. --- acl/acl.go | 12 + acl/acl_test.go | 12 + api/snapshot.go | 47 ++ api/snapshot_test.go | 134 ++++++ command/agent/agent.go | 13 + command/agent/http.go | 117 +++-- command/agent/snapshot_endpoint.go | 50 +++ command/agent/snapshot_endpoint_test.go | 142 ++++++ command/snapshot_command.go | 48 ++ command/snapshot_command_test.go | 15 + command/snapshot_restore.go | 103 +++++ command/snapshot_restore_test.go | 103 +++++ command/snapshot_save.go | 132 ++++++ command/snapshot_save_test.go | 94 ++++ commands.go | 18 + consul/client.go | 46 ++ consul/client_test.go | 106 +++++ consul/pool.go | 32 +- consul/prepared_query_endpoint_test.go | 4 +- consul/rpc.go | 47 +- consul/server.go | 34 ++ consul/snapshot/archive.go | 223 ++++++++++ consul/snapshot/archive_test.go | 134 ++++++ consul/snapshot/snapshot.go | 193 ++++++++ consul/snapshot/snapshot_test.go | 297 +++++++++++++ consul/snapshot_endpoint.go | 224 ++++++++++ consul/snapshot_endpoint_test.go | 416 ++++++++++++++++++ consul/structs/snapshot.go | 40 ++ test/snapshot/corrupt-meta.tar | 3 + test/snapshot/corrupt-sha.tar | 3 + test/snapshot/corrupt-state.tar | 3 + test/snapshot/empty.tar | 3 + test/snapshot/extra.tar | 3 + test/snapshot/missing-meta.tar | 3 + test/snapshot/missing-sha.tar | 3 + test/snapshot/missing-state.tar | 3 + vendor/github.com/hashicorp/raft/Makefile | 4 +- vendor/github.com/hashicorp/raft/api.go | 110 ++++- vendor/github.com/hashicorp/raft/fsm.go | 128 +++--- vendor/github.com/hashicorp/raft/future.go | 48 +- vendor/github.com/hashicorp/raft/raft.go | 112 ++++- vendor/github.com/hashicorp/raft/snapshot.go | 35 +- vendor/vendor.json | 6 +- website/source/docs/agent/http.html.markdown | 1 + .../docs/agent/http/snapshot.html.markdown | 97 ++++ website/source/docs/commands/kv.html.markdown | 2 +- .../docs/commands/snapshot.html.markdown | 59 +++ .../snapshot/restore.html.markdown.erb | 42 ++ .../commands/snapshot/save.html.markdown.erb | 56 +++ website/source/layouts/docs.erb | 18 +- 50 files changed, 3396 insertions(+), 182 deletions(-) create mode 100644 api/snapshot.go create mode 100644 api/snapshot_test.go create mode 100644 command/agent/snapshot_endpoint.go create mode 100644 command/agent/snapshot_endpoint_test.go create mode 100644 command/snapshot_command.go create mode 100644 command/snapshot_command_test.go create mode 100644 command/snapshot_restore.go create mode 100644 command/snapshot_restore_test.go create mode 100644 command/snapshot_save.go create mode 100644 command/snapshot_save_test.go create mode 100644 consul/snapshot/archive.go create mode 100644 consul/snapshot/archive_test.go create mode 100644 consul/snapshot/snapshot.go create mode 100644 consul/snapshot/snapshot_test.go create mode 100644 consul/snapshot_endpoint.go create mode 100644 consul/snapshot_endpoint_test.go create mode 100644 consul/structs/snapshot.go create mode 100644 test/snapshot/corrupt-meta.tar create mode 100644 test/snapshot/corrupt-sha.tar create mode 100644 test/snapshot/corrupt-state.tar create mode 100644 test/snapshot/empty.tar create mode 100644 test/snapshot/extra.tar create mode 100644 test/snapshot/missing-meta.tar create mode 100644 test/snapshot/missing-sha.tar create mode 100644 test/snapshot/missing-state.tar create mode 100644 website/source/docs/agent/http/snapshot.html.markdown create mode 100644 website/source/docs/commands/snapshot.html.markdown create mode 100644 website/source/docs/commands/snapshot/restore.html.markdown.erb create mode 100644 website/source/docs/commands/snapshot/save.html.markdown.erb diff --git a/acl/acl.go b/acl/acl.go index f13dc5b56..f0180d60e 100644 --- a/acl/acl.go +++ b/acl/acl.go @@ -86,6 +86,9 @@ type ACL interface { // ACLModify checks for permission to manipulate ACLs ACLModify() bool + + // Snapshot checks for permission to take and restore snapshots. + Snapshot() bool } // StaticACL is used to implement a base ACL policy. It either @@ -156,6 +159,10 @@ func (s *StaticACL) ACLModify() bool { return s.allowManage } +func (s *StaticACL) Snapshot() bool { + return s.allowManage +} + // AllowAll returns an ACL rule that allows all operations func AllowAll() ACL { return allowAll @@ -474,3 +481,8 @@ func (p *PolicyACL) ACLList() bool { func (p *PolicyACL) ACLModify() bool { return p.parent.ACLModify() } + +// Snapshot checks if taking and restoring snapshots is allowed. +func (p *PolicyACL) Snapshot() bool { + return p.parent.Snapshot() +} diff --git a/acl/acl_test.go b/acl/acl_test.go index aa1c7972d..c6ce5789a 100644 --- a/acl/acl_test.go +++ b/acl/acl_test.go @@ -77,6 +77,9 @@ func TestStaticACL(t *testing.T) { if all.ACLModify() { t.Fatalf("should not allow") } + if all.Snapshot() { + t.Fatalf("should not allow") + } if none.KeyRead("foobar") { t.Fatalf("should not allow") @@ -126,6 +129,9 @@ func TestStaticACL(t *testing.T) { if none.ACLModify() { t.Fatalf("should not allow") } + if none.Snapshot() { + t.Fatalf("should not allow") + } if !manage.KeyRead("foobar") { t.Fatalf("should allow") @@ -169,6 +175,9 @@ func TestStaticACL(t *testing.T) { if !manage.ACLModify() { t.Fatalf("should allow") } + if !manage.Snapshot() { + t.Fatalf("should allow") + } } func TestPolicyACL(t *testing.T) { @@ -495,6 +504,9 @@ func TestPolicyACL_Parent(t *testing.T) { if acl.ACLModify() { t.Fatalf("should not allow") } + if acl.Snapshot() { + t.Fatalf("should not allow") + } } func TestPolicyACL_Keyring(t *testing.T) { diff --git a/api/snapshot.go b/api/snapshot.go new file mode 100644 index 000000000..e902377dd --- /dev/null +++ b/api/snapshot.go @@ -0,0 +1,47 @@ +package api + +import ( + "io" +) + +// Snapshot can be used to query the /v1/snapshot endpoint to take snapshots of +// Consul's internal state and restore snapshots for disaster recovery. +type Snapshot struct { + c *Client +} + +// Snapshot returns a handle that exposes the snapshot endpoints. +func (c *Client) Snapshot() *Snapshot { + return &Snapshot{c} +} + +// Save requests a new snapshot and provides an io.ReadCloser with the snapshot +// data to save. If this doesn't return an error, then it's the responsibility +// of the caller to close it. Only a subset of the QueryOptions are supported: +// Datacenter, AllowStale, and Token. +func (s *Snapshot) Save(q *QueryOptions) (io.ReadCloser, *QueryMeta, error) { + r := s.c.newRequest("GET", "/v1/snapshot") + r.setQueryOptions(q) + + rtt, resp, err := requireOK(s.c.doRequest(r)) + if err != nil { + return nil, nil, err + } + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + return resp.Body, qm, nil +} + +// Restore streams in an existing snapshot and attempts to restore it. +func (s *Snapshot) Restore(q *WriteOptions, in io.Reader) error { + r := s.c.newRequest("PUT", "/v1/snapshot") + r.body = in + r.setWriteOptions(q) + _, _, err := requireOK(s.c.doRequest(r)) + if err != nil { + return err + } + return nil +} diff --git a/api/snapshot_test.go b/api/snapshot_test.go new file mode 100644 index 000000000..7d639f891 --- /dev/null +++ b/api/snapshot_test.go @@ -0,0 +1,134 @@ +package api + +import ( + "bytes" + "strings" + "testing" +) + +func TestSnapshot(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // Place an initial key into the store. + kv := c.KV() + key := &KVPair{Key: testKey(), Value: []byte("hello")} + if _, err := kv.Put(key, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it reads back. + pair, _, err := kv.Get(key.Key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if !bytes.Equal(pair.Value, []byte("hello")) { + t.Fatalf("unexpected value: %#v", pair) + } + + // Take a snapshot. + snapshot := c.Snapshot() + snap, qm, err := snapshot.Save(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // Sanity check th query metadata. + if qm.LastIndex == 0 || !qm.KnownLeader || + qm.RequestTime == 0 { + t.Fatalf("bad: %v", qm) + } + + // Overwrite the key's value. + key.Value = []byte("goodbye") + if _, err := kv.Put(key, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // Read the key back and look for the new value. + pair, _, err = kv.Get(key.Key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if !bytes.Equal(pair.Value, []byte("goodbye")) { + t.Fatalf("unexpected value: %#v", pair) + } + + // Restore the snapshot. + if err := snapshot.Restore(nil, snap); err != nil { + t.Fatalf("err: %v", err) + } + + // Read the key back and look for the original value. + pair, _, err = kv.Get(key.Key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if !bytes.Equal(pair.Value, []byte("hello")) { + t.Fatalf("unexpected value: %#v", pair) + } +} + +func TestSnapshot_Options(t *testing.T) { + t.Parallel() + c, s := makeACLClient(t) + defer s.Stop() + + // Try to take a snapshot with a bad token. + snapshot := c.Snapshot() + _, _, err := snapshot.Save(&QueryOptions{Token: "anonymous"}) + if err == nil || !strings.Contains(err.Error(), "Permission denied") { + t.Fatalf("err: %v", err) + } + + // Now try an unknown DC. + _, _, err = snapshot.Save(&QueryOptions{Datacenter: "nope"}) + if err == nil || !strings.Contains(err.Error(), "No path to datacenter") { + t.Fatalf("err: %v", err) + } + + // This should work with a valid token. + snap, _, err := snapshot.Save(&QueryOptions{Token: "root"}) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // This should work with a stale snapshot. This doesn't have good feedback + // that the stale option was sent, but it makes sure nothing bad happens. + snap, _, err = snapshot.Save(&QueryOptions{Token: "root", AllowStale: true}) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // Try to restore a snapshot with a bad token. + null := bytes.NewReader([]byte("")) + err = snapshot.Restore(&WriteOptions{Token: "anonymous"}, null) + if err == nil || !strings.Contains(err.Error(), "Permission denied") { + t.Fatalf("err: %v", err) + } + + // Now try an unknown DC. + null = bytes.NewReader([]byte("")) + err = snapshot.Restore(&WriteOptions{Datacenter: "nope"}, null) + if err == nil || !strings.Contains(err.Error(), "No path to datacenter") { + t.Fatalf("err: %v", err) + } + + // This should work. + if err := snapshot.Restore(&WriteOptions{Token: "root"}, snap); err != nil { + t.Fatalf("err: %v", err) + } +} diff --git a/command/agent/agent.go b/command/agent/agent.go index 75eee27bc..4349f1fa4 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -469,6 +469,19 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { return a.client.RPC(method, args, reply) } +// SnapshotRPC performs the requested snapshot RPC against the Consul server in +// a streaming manner. The contents of in will be read and passed along as the +// payload, and the response message will determine the error status, and any +// return payload will be written to out. +func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, + replyFn consul.SnapshotReplyFn) error { + + if a.server != nil { + return a.server.SnapshotRPC(args, in, out, replyFn) + } + return a.client.SnapshotRPC(args, in, out, replyFn) +} + // Leave is used to prepare the agent for a graceful shutdown func (a *Agent) Leave() error { if a.server != nil { diff --git a/command/agent/http.go b/command/agent/http.go index 4ce51ff4b..082fc039b 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -231,64 +231,7 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/", s.Index) - s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) - s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) - - s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) - s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) - - s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) - s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) - s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) - s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) - s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) - s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) - - if !s.agent.config.DisableCoordinates { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) - } else { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) - } - - s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) - s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) - s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) - s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) - - s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) - s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) - s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) - s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) - s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) - s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) - s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) - - s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) - s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) - s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) - s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) - s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) - s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) - - s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) - s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) - s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) - - s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) - s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) - - s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) - - s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) - s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) - s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) - s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) - s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) - s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) - + // API V1. if s.agent.config.ACLDatacenter != "" { s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) @@ -306,12 +249,62 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.handleFuncMetrics("/v1/acl/list", s.wrap(aclDisabled)) s.handleFuncMetrics("/v1/acl/replication", s.wrap(aclDisabled)) } - + s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) + s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) + s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) + s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) + s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) + s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) + s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) + s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) + s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) + s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) + s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) + s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) + s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) + s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) + s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) + s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) + s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) + s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) + s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) + s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) + s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) + s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) + if !s.agent.config.DisableCoordinates { + s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) + s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) + } else { + s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) + s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) + } + s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) + s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) + s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) + s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) + s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) + s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) + s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) + s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) + s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) + s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) + s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) + s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) - + s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) + s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) + s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) + s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) + s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) + s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) + s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) + s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) + s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) + // Debug endpoints. if enableDebug { s.handleFuncMetrics("/debug/pprof/", pprof.Index) s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) @@ -326,10 +319,6 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) } - // API's are under /internal/ui/ to avoid conflict - s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) - s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) - s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) } // wrap is used to wrap functions to make them more convenient diff --git a/command/agent/snapshot_endpoint.go b/command/agent/snapshot_endpoint.go new file mode 100644 index 000000000..06bf150d7 --- /dev/null +++ b/command/agent/snapshot_endpoint.go @@ -0,0 +1,50 @@ +package agent + +import ( + "bytes" + "net/http" + + "github.com/hashicorp/consul/consul/structs" +) + +// Snapshot handles requests to take and restore snapshots. This uses a special +// mechanism to make the RPC since we potentially stream large amounts of data +// as part of these requests. +func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var args structs.SnapshotRequest + s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) + if _, ok := req.URL.Query()["stale"]; ok { + args.AllowStale = true + } + + switch req.Method { + case "GET": + args.Op = structs.SnapshotSave + + // Headers need to go out before we stream the body. + replyFn := func(reply *structs.SnapshotResponse) error { + setMeta(resp, &reply.QueryMeta) + return nil + } + + // Don't bother sending any request body through since it will + // be ignored. + var null bytes.Buffer + if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil { + return nil, err + } + return nil, nil + + case "PUT": + args.Op = structs.SnapshotRestore + if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil { + return nil, err + } + return nil, nil + + default: + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } +} diff --git a/command/agent/snapshot_endpoint_test.go b/command/agent/snapshot_endpoint_test.go new file mode 100644 index 000000000..f2a99a645 --- /dev/null +++ b/command/agent/snapshot_endpoint_test.go @@ -0,0 +1,142 @@ +package agent + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestSnapshot(t *testing.T) { + var snap io.Reader + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/snapshot?token=root", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + snap = resp.Body + + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("bad: %v", header) + } + header = resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("bad: %v", header) + } + header = resp.Header().Get("X-Consul-LastContact") + if header != "0" { + t.Fatalf("bad: %v", header) + } + }) + + httpTest(t, func(srv *HTTPServer) { + req, err := http.NewRequest("PUT", "/v1/snapshot?token=root", snap) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + }) +} + +func TestSnapshot_Options(t *testing.T) { + for _, method := range []string{"GET", "PUT"} { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest(method, "/v1/snapshot?token=anonymous", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err == nil || !strings.Contains(err.Error(), "Permission denied") { + t.Fatalf("err: %v", err) + } + }) + + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest(method, "/v1/snapshot?dc=nope", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err == nil || !strings.Contains(err.Error(), "No path to datacenter") { + t.Fatalf("err: %v", err) + } + }) + + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest(method, "/v1/snapshot?token=root&stale", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if method == "GET" { + if err != nil { + t.Fatalf("err: %v", err) + } + } else { + if err == nil || !strings.Contains(err.Error(), "stale not allowed") { + t.Fatalf("err: %v", err) + } + } + }) + } +} + +func TestSnapshot_BadMethods(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("POST", "/v1/snapshot", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 405 { + t.Fatalf("bad code: %d", resp.Code) + } + }) + + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("DELETE", "/v1/snapshot", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.Snapshot(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 405 { + t.Fatalf("bad code: %d", resp.Code) + } + }) +} diff --git a/command/snapshot_command.go b/command/snapshot_command.go new file mode 100644 index 000000000..1d1837243 --- /dev/null +++ b/command/snapshot_command.go @@ -0,0 +1,48 @@ +package command + +import ( + "strings" + + "github.com/mitchellh/cli" +) + +// SnapshotCommand is a Command implementation that just shows help for +// the subcommands nested below it. +type SnapshotCommand struct { + Ui cli.Ui +} + +func (c *SnapshotCommand) Run(args []string) int { + return cli.RunResultHelp +} + +func (c *SnapshotCommand) Help() string { + helpText := ` +Usage: consul snapshot [options] [args] + + This command has subcommands for saving and restoring the state of the Consul + servers for disaster recovery. These are atomic, point-in-time snapshots which + include key/value entries, service catalog, prepared queries, sessions, and + ACLs. + + If ACLs are enabled, a management token must be supplied in order to perform + snapshot operations. + + Create a snapshot: + + $ consul snapshot save backup.snap + + Restore a snapshot: + + $ consul snapshot restore backup.snap + + + For more examples, ask for subcommand help or view the documentation. + +` + return strings.TrimSpace(helpText) +} + +func (c *SnapshotCommand) Synopsis() string { + return "Saves and restores snapshots of Consul server state" +} diff --git a/command/snapshot_command_test.go b/command/snapshot_command_test.go new file mode 100644 index 000000000..4e0807b56 --- /dev/null +++ b/command/snapshot_command_test.go @@ -0,0 +1,15 @@ +package command + +import ( + "testing" + + "github.com/mitchellh/cli" +) + +func TestSnapshotCommand_implements(t *testing.T) { + var _ cli.Command = &SnapshotCommand{} +} + +func TestSnapshotCommand_noTabs(t *testing.T) { + assertNoTabs(t, new(SnapshotCommand)) +} diff --git a/command/snapshot_restore.go b/command/snapshot_restore.go new file mode 100644 index 000000000..a4b08288e --- /dev/null +++ b/command/snapshot_restore.go @@ -0,0 +1,103 @@ +package command + +import ( + "flag" + "fmt" + "os" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/mitchellh/cli" +) + +// SnapshotRestoreCommand is a Command implementation that is used to restore +// the state of the Consul servers for disaster recovery. +type SnapshotRestoreCommand struct { + Ui cli.Ui +} + +func (c *SnapshotRestoreCommand) Help() string { + helpText := ` +Usage: consul snapshot restore [options] FILE + + Restores an atomic, point-in-time snapshot of the state of the Consul servers + which includes key/value entries, service catalog, prepared queries, sessions, + and ACLs. + + Restores involve a potentially dangerous low-level Raft operation that is not + designed to handle server failures during a restore. This command is primarily + intended to be used when recovering from a disaster, restoring into a fresh + cluster of Consul servers. + + If ACLs are enabled, a management token must be supplied in order to perform + snapshot operations. + + To restore a snapshot from the file "backup.snap": + + $ consul snapshot restore backup.snap + + For a full list of options and examples, please see the Consul documentation. + +` + apiOptsText + + return strings.TrimSpace(helpText) +} + +func (c *SnapshotRestoreCommand) Run(args []string) int { + cmdFlags := flag.NewFlagSet("get", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + datacenter := cmdFlags.String("datacenter", "", "") + token := cmdFlags.String("token", "", "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + var file string + + args = cmdFlags.Args() + switch len(args) { + case 0: + c.Ui.Error("Missing FILE argument") + return 1 + case 1: + file = args[0] + default: + c.Ui.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args))) + return 1 + } + + // Create and test the HTTP client + conf := api.DefaultConfig() + conf.Address = *httpAddr + conf.Token = *token + client, err := api.NewClient(conf) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + + // Open the file. + f, err := os.Open(file) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err)) + return 1 + } + defer f.Close() + + // Restore the snapshot. + err = client.Snapshot().Restore(&api.WriteOptions{ + Datacenter: *datacenter, + }, f) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error restoring snapshot: %s", err)) + return 1 + } + + c.Ui.Info("Restored snapshot") + return 0 +} + +func (c *SnapshotRestoreCommand) Synopsis() string { + return "Restores snapshot of Consul server state" +} diff --git a/command/snapshot_restore_test.go b/command/snapshot_restore_test.go new file mode 100644 index 000000000..e9f1b4ae6 --- /dev/null +++ b/command/snapshot_restore_test.go @@ -0,0 +1,103 @@ +package command + +import ( + "io" + "io/ioutil" + "os" + "path" + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestSnapshotRestoreCommand_implements(t *testing.T) { + var _ cli.Command = &SnapshotRestoreCommand{} +} + +func TestSnapshotRestoreCommand_noTabs(t *testing.T) { + assertNoTabs(t, new(SnapshotRestoreCommand)) +} + +func TestSnapshotRestoreCommand_Validation(t *testing.T) { + ui := new(cli.MockUi) + c := &SnapshotRestoreCommand{Ui: ui} + + cases := map[string]struct { + args []string + output string + }{ + "no file": { + []string{}, + "Missing FILE argument", + }, + "extra args": { + []string{"foo", "bar", "baz"}, + "Too many arguments", + }, + } + + for name, tc := range cases { + // Ensure our buffer is always clear + if ui.ErrorWriter != nil { + ui.ErrorWriter.Reset() + } + if ui.OutputWriter != nil { + ui.OutputWriter.Reset() + } + + code := c.Run(tc.args) + if code == 0 { + t.Errorf("%s: expected non-zero exit", name) + } + + output := ui.ErrorWriter.String() + if !strings.Contains(output, tc.output) { + t.Errorf("%s: expected %q to contain %q", name, output, tc.output) + } + } +} + +func TestSnapshotRestoreCommand_Run(t *testing.T) { + srv, client := testAgentWithAPIClient(t) + defer srv.Shutdown() + waitForLeader(t, srv.httpAddr) + + ui := new(cli.MockUi) + c := &SnapshotSaveCommand{Ui: ui} + + dir, err := ioutil.TempDir("", "snapshot") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + file := path.Join(dir, "backup.tgz") + args := []string{ + "-http-addr=" + srv.httpAddr, + file, + } + + f, err := os.Create(file) + if err != nil { + t.Fatalf("err: %v", err) + } + + snap, _, err := client.Snapshot().Save(nil) + if err != nil { + f.Close() + t.Fatalf("err: %v", err) + } + if _, err := io.Copy(f, snap); err != nil { + f.Close() + t.Fatalf("err: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("err: %v", err) + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } +} diff --git a/command/snapshot_save.go b/command/snapshot_save.go new file mode 100644 index 000000000..ed7bc0324 --- /dev/null +++ b/command/snapshot_save.go @@ -0,0 +1,132 @@ +package command + +import ( + "flag" + "fmt" + "io" + "os" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/consul/snapshot" + "github.com/mitchellh/cli" +) + +// SnapshotSaveCommand is a Command implementation that is used to save the +// state of the Consul servers for disaster recovery. +type SnapshotSaveCommand struct { + Ui cli.Ui +} + +func (c *SnapshotSaveCommand) Help() string { + helpText := ` +Usage: consul snapshot save [options] FILE + + Retrieves an atomic, point-in-time snapshot of the state of the Consul servers + which includes key/value entries, service catalog, prepared queries, sessions, + and ACLs. + + If ACLs are enabled, a management token must be supplied in order to perform + snapshot operations. + + To create a snapshot from the leader server and save it to "backup.snap": + + $ consul snapshot save backup.snap + + To create a potentially stale snapshot from any available server (useful if no + leader is available): + + $ consul snapshot save -stale backup.snap + + For a full list of options and examples, please see the Consul documentation. + +` + apiOptsText + + return strings.TrimSpace(helpText) +} + +func (c *SnapshotSaveCommand) Run(args []string) int { + cmdFlags := flag.NewFlagSet("get", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + datacenter := cmdFlags.String("datacenter", "", "") + token := cmdFlags.String("token", "", "") + stale := cmdFlags.Bool("stale", false, "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + var file string + + args = cmdFlags.Args() + switch len(args) { + case 0: + c.Ui.Error("Missing FILE argument") + return 1 + case 1: + file = args[0] + default: + c.Ui.Error(fmt.Sprintf("Too many arguments (expected 1, got %d)", len(args))) + return 1 + } + + // Create and test the HTTP client + conf := api.DefaultConfig() + conf.Address = *httpAddr + conf.Token = *token + client, err := api.NewClient(conf) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + + // Take the snapshot. + snap, qm, err := client.Snapshot().Save(&api.QueryOptions{ + Datacenter: *datacenter, + AllowStale: *stale, + }) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error saving snapshot: %s", err)) + return 1 + } + defer snap.Close() + + // Save the file. + f, err := os.Create(file) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error creating snapshot file: %s", err)) + return 1 + } + if _, err := io.Copy(f, snap); err != nil { + f.Close() + c.Ui.Error(fmt.Sprintf("Error writing snapshot file: %s", err)) + return 1 + } + if err := f.Close(); err != nil { + c.Ui.Error(fmt.Sprintf("Error closing snapshot file after writing: %s", err)) + return 1 + } + + // Read it back to verify. + f, err = os.Open(file) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error opening snapshot file for verify: %s", err)) + return 1 + } + if err := snapshot.Verify(f); err != nil { + f.Close() + c.Ui.Error(fmt.Sprintf("Error verifying snapshot file: %s", err)) + return 1 + } + if err := f.Close(); err != nil { + c.Ui.Error(fmt.Sprintf("Error closing snapshot file after verify: %s", err)) + return 1 + } + + c.Ui.Info(fmt.Sprintf("Saved and verified snapshot to index %d", qm.LastIndex)) + return 0 +} + +func (c *SnapshotSaveCommand) Synopsis() string { + return "Saves snapshot of Consul server state" +} diff --git a/command/snapshot_save_test.go b/command/snapshot_save_test.go new file mode 100644 index 000000000..e1eeb87c6 --- /dev/null +++ b/command/snapshot_save_test.go @@ -0,0 +1,94 @@ +package command + +import ( + "io/ioutil" + "os" + "path" + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestSnapshotSaveCommand_implements(t *testing.T) { + var _ cli.Command = &SnapshotSaveCommand{} +} + +func TestSnapshotSaveCommand_noTabs(t *testing.T) { + assertNoTabs(t, new(SnapshotSaveCommand)) +} + +func TestSnapshotSaveCommand_Validation(t *testing.T) { + ui := new(cli.MockUi) + c := &SnapshotSaveCommand{Ui: ui} + + cases := map[string]struct { + args []string + output string + }{ + "no file": { + []string{}, + "Missing FILE argument", + }, + "extra args": { + []string{"foo", "bar", "baz"}, + "Too many arguments", + }, + } + + for name, tc := range cases { + // Ensure our buffer is always clear + if ui.ErrorWriter != nil { + ui.ErrorWriter.Reset() + } + if ui.OutputWriter != nil { + ui.OutputWriter.Reset() + } + + code := c.Run(tc.args) + if code == 0 { + t.Errorf("%s: expected non-zero exit", name) + } + + output := ui.ErrorWriter.String() + if !strings.Contains(output, tc.output) { + t.Errorf("%s: expected %q to contain %q", name, output, tc.output) + } + } +} + +func TestSnapshotSaveCommand_Run(t *testing.T) { + srv, client := testAgentWithAPIClient(t) + defer srv.Shutdown() + waitForLeader(t, srv.httpAddr) + + ui := new(cli.MockUi) + c := &SnapshotSaveCommand{Ui: ui} + + dir, err := ioutil.TempDir("", "snapshot") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + file := path.Join(dir, "backup.tgz") + args := []string{ + "-http-addr=" + srv.httpAddr, + file, + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + f, err := os.Open(file) + if err != nil { + t.Fatalf("err: %v", err) + } + defer f.Close() + + if err := client.Snapshot().Restore(nil, f); err != nil { + t.Fatalf("err: %v", err) + } +} diff --git a/commands.go b/commands.go index 41f60b3c0..8e45c2254 100644 --- a/commands.go +++ b/commands.go @@ -151,6 +151,24 @@ func init() { }, nil }, + "snapshot": func() (cli.Command, error) { + return &command.SnapshotCommand{ + Ui: ui, + }, nil + }, + + "snapshot restore": func() (cli.Command, error) { + return &command.SnapshotRestoreCommand{ + Ui: ui, + }, nil + }, + + "snapshot save": func() (cli.Command, error) { + return &command.SnapshotSaveCommand{ + Ui: ui, + }, nil + }, + "version": func() (cli.Command, error) { return &command.VersionCommand{ HumanVersion: GetHumanVersion(), diff --git a/consul/client.go b/consul/client.go index bd5132c66..92c4919a2 100644 --- a/consul/client.go +++ b/consul/client.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "io" "log" "os" "path/filepath" @@ -337,6 +338,51 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return nil } +// SnapshotReplyFn gets a peek at the reply before the snapshot streams, which +// is useful for setting headers. +type SnapshotReplyFn func(reply *structs.SnapshotResponse) error + +// SnapshotRPC sends the snapshot request to one of the servers, reading from +// the streaming input and writing to the streaming output depending on the +// operation. +func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, + replyFn SnapshotReplyFn) error { + + // Locate a server to make the request to. + server := c.servers.FindServer() + if server == nil { + return structs.ErrNoServers + } + + // Request the operation. + var reply structs.SnapshotResponse + snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply) + if err != nil { + return err + } + defer func() { + if err := snap.Close(); err != nil { + c.logger.Printf("[WARN] consul: Failed closing snapshot stream: %v", err) + } + }() + + // Let the caller peek at the reply. + if replyFn != nil { + if err := replyFn(&reply); err != nil { + return nil + } + } + + // Stream the snapshot. + if out != nil { + if _, err := io.Copy(out, snap); err != nil { + return fmt.Errorf("failed to stream snapshot: %v", err) + } + } + + return nil +} + // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { diff --git a/consul/client_test.go b/consul/client_test.go index 349633c1f..8ea419de5 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -1,6 +1,7 @@ package consul import ( + "bytes" "fmt" "net" "os" @@ -360,6 +361,111 @@ func TestClient_RPC_TLS(t *testing.T) { }) } +func TestClient_SnapshotRPC(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Wait for the leader + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 { + t.Fatalf("Server has %v of %v expected members; Client has %v of %v expected members.", len(s1.LANMembers()), 2, len(c1.LANMembers()), 2) + } + + // Wait until we've got a healthy server. + testutil.WaitForResult(func() (bool, error) { + return c1.servers.NumServers() == 1, nil + }, func(err error) { + t.Fatalf("expected consul server") + }) + + // Take a snapshot. + var snap bytes.Buffer + args := structs.SnapshotRequest{ + Datacenter: "dc1", + Op: structs.SnapshotSave, + } + if err := c1.SnapshotRPC(&args, bytes.NewReader([]byte("")), &snap, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // Restore a snapshot. + args.Op = structs.SnapshotRestore + if err := c1.SnapshotRPC(&args, &snap, nil, nil); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestClient_SnapshotRPC_TLS(t *testing.T) { + dir1, conf1 := testServerConfig(t, "a.testco.internal") + conf1.VerifyIncoming = true + conf1.VerifyOutgoing = true + configureTLS(conf1) + s1, err := NewServer(conf1) + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, conf2 := testClientConfig(t, "b.testco.internal") + conf2.VerifyOutgoing = true + configureTLS(conf2) + c1, err := NewClient(conf2) + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Wait for the leader + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 { + t.Fatalf("Server has %v of %v expected members; Client has %v of %v expected members.", len(s1.LANMembers()), 2, len(c1.LANMembers()), 2) + } + + // Wait until we've got a healthy server. + testutil.WaitForResult(func() (bool, error) { + return c1.servers.NumServers() == 1, nil + }, func(err error) { + t.Fatalf("expected consul server") + }) + + // Take a snapshot. + var snap bytes.Buffer + args := structs.SnapshotRequest{ + Datacenter: "dc1", + Op: structs.SnapshotSave, + } + if err := c1.SnapshotRPC(&args, bytes.NewReader([]byte("")), &snap, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // Restore a snapshot. + args.Op = structs.SnapshotRestore + if err := c1.SnapshotRPC(&args, &snap, nil, nil); err != nil { + t.Fatalf("err: %v", err) + } +} + func TestClientServer_UserEvent(t *testing.T) { clientOut := make(chan serf.UserEvent, 2) dir1, c1 := testClientWithConfig(t, func(conf *Config) { diff --git a/consul/pool.go b/consul/pool.go index 0906b2827..2002b997a 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -248,18 +248,29 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) return nil, fmt.Errorf("rpc error: lead thread didn't get connection") } -// getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) { +// HalfCloser is an interface that exposes a TCP half-close. We need this +// because we want to expose the raw TCP connection underlying a TLS one in a +// way that's hard to screw up and use for anything else. There's a change +// brewing that will allow us to use the TLS connection for this instead - +// https://go-review.googlesource.com/#/c/25159/. +type HalfCloser interface { + CloseWrite() error +} + +// Dial is used to establish a raw connection to the given server. +func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) { // Try to dial the conn conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second) if err != nil { - return nil, err + return nil, nil, err } // Cast to TCPConn + var hc HalfCloser if tcp, ok := conn.(*net.TCPConn); ok { tcp.SetKeepAlive(true) tcp.SetNoDelay(true) + hc = tcp } // Check if TLS is enabled @@ -267,18 +278,29 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, err // Switch the connection into TLS mode if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil { conn.Close() - return nil, err + return nil, nil, err } // Wrap the connection in a TLS client tlsConn, err := p.tlsWrap(dc, conn) if err != nil { conn.Close() - return nil, err + return nil, nil, err } conn = tlsConn } + return conn, hc, nil +} + +// getNewConn is used to return a new connection +func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) { + // Get a new, raw connection. + conn, _, err := p.Dial(dc, addr) + if err != nil { + return nil, err + } + // Switch the multiplexing based on version var session muxSession if version < 2 { diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go index 49075ed92..d23b627b9 100644 --- a/consul/prepared_query_endpoint_test.go +++ b/consul/prepared_query_endpoint_test.go @@ -459,7 +459,9 @@ func TestPreparedQuery_Apply_ACLDeny(t *testing.T) { } func TestPreparedQuery_Apply_ForwardLeader(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + }) defer os.RemoveAll(dir1) defer s1.Shutdown() codec1 := rpcClient(t, s1) diff --git a/consul/rpc.go b/consul/rpc.go index d5a6a694e..315b7f1d2 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -27,6 +27,7 @@ const ( rpcMultiplex // Old Muxado byte, no longer supported. rpcTLS rpcMultiplexV2 + rpcSnapshot ) const ( @@ -119,6 +120,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { case rpcMultiplexV2: s.handleMultiplexV2(conn) + case rpcSnapshot: + s.handleSnapshotConn(conn) + default: s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn)) conn.Close() @@ -167,6 +171,17 @@ func (s *Server) handleConsulConn(conn net.Conn) { } } +// handleSnapshotConn is used to dispatch snapshot saves and restores, which +// stream so don't use the normal RPC mechanism. +func (s *Server) handleSnapshotConn(conn net.Conn) { + go func() { + defer conn.Close() + if err := s.handleSnapshotRequest(conn); err != nil { + s.logger.Printf("[ERR] consul.rpc: Snapshot RPC error: %v %s", err, logConn(conn)) + } + }() +} + // forward is used to forward to a remote DC or to forward to the local leader // Returns a bool of if forwarding was performed, as well as any error func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { @@ -216,9 +231,9 @@ CHECK_LEADER: return true, structs.ErrNoLeader } -// getLeader returns if the current node is the leader, and if not -// then it returns the leader which is potentially nil if the cluster -// has not yet elected a leader. +// getLeader returns if the current node is the leader, and if not then it +// returns the leader which is potentially nil if the cluster has not yet +// elected a leader. func (s *Server) getLeader() (bool, *agent.Server) { // Check if we are the leader if s.IsLeader() { @@ -249,23 +264,29 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply) } -// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers -func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error { - // Bail if we can't find any servers +// getRemoteServer returns a random server from a remote datacenter. This uses +// the bool parameter to signal that none were available. +func (s *Server) getRemoteServer(dc string) (*agent.Server, bool) { s.remoteLock.RLock() + defer s.remoteLock.RUnlock() servers := s.remoteConsuls[dc] if len(servers) == 0 { - s.remoteLock.RUnlock() + return nil, false + } + + offset := rand.Int31n(int32(len(servers))) + server := servers[offset] + return server, true +} + +// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers +func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error { + server, ok := s.getRemoteServer(dc) + if !ok { s.logger.Printf("[WARN] consul.rpc: RPC request for DC '%s', no path found", dc) return structs.ErrNoDCPath } - // Select a random addr - offset := rand.Int31n(int32(len(servers))) - server := servers[offset] - s.remoteLock.RUnlock() - - // Forward to remote Consul metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply) } diff --git a/consul/server.go b/consul/server.go index ed9a7f788..f4a5c90d9 100644 --- a/consul/server.go +++ b/consul/server.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "io" "io/ioutil" "log" "net" @@ -806,6 +807,39 @@ func (s *Server) RPC(method string, args interface{}, reply interface{}) error { return codec.err } +// SnapshotRPC dispatches the given snapshot request, reading from the streaming +// input and writing to the streaming output depending on the operation. +func (s *Server) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, + replyFn SnapshotReplyFn) error { + + // Perform the operation. + var reply structs.SnapshotResponse + snap, err := s.dispatchSnapshotRequest(args, in, &reply) + if err != nil { + return err + } + defer func() { + if err := snap.Close(); err != nil { + s.logger.Printf("[ERR] consul: Failed to close snapshot: %v", err) + } + }() + + // Let the caller peek at the reply. + if replyFn != nil { + if err := replyFn(&reply); err != nil { + return nil + } + } + + // Stream the snapshot. + if out != nil { + if _, err := io.Copy(out, snap); err != nil { + return fmt.Errorf("failed to stream snapshot: %v", err) + } + } + return nil +} + // InjectEndpoint is used to substitute an endpoint for testing. func (s *Server) InjectEndpoint(endpoint interface{}) error { s.logger.Printf("[WARN] consul: endpoint injected; this should only be used for testing") diff --git a/consul/snapshot/archive.go b/consul/snapshot/archive.go new file mode 100644 index 000000000..d4eccb43c --- /dev/null +++ b/consul/snapshot/archive.go @@ -0,0 +1,223 @@ +// The archive utilities manage the internal format of a snapshot, which is a +// tar file with the following contents: +// +// meta.json - JSON-encoded snapshot metadata from Raft +// state.bin - Encoded snapshot data from Raft +// SHA256SUMS - SHA-256 sums of the above two files +// +// The integrity information is automatically created and checked, and a failure +// there just looks like an error to the caller. +package snapshot + +import ( + "archive/tar" + "bufio" + "bytes" + "crypto/sha256" + "encoding/json" + "fmt" + "hash" + "io" + "time" + + "github.com/hashicorp/raft" +) + +// hashList manages a list of filenames and their hashes. +type hashList struct { + hashes map[string]hash.Hash +} + +// newHashList returns a new hashList. +func newHashList() *hashList { + return &hashList{ + hashes: make(map[string]hash.Hash), + } +} + +// Add creates a new hash for the given file. +func (hl *hashList) Add(file string) hash.Hash { + if existing, ok := hl.hashes[file]; ok { + return existing + } + + h := sha256.New() + hl.hashes[file] = h + return h +} + +// Encode takes the current sum of all the hashes and saves the hash list as a +// SHA256SUMS-style text file. +func (hl *hashList) Encode(w io.Writer) error { + for file, h := range hl.hashes { + if _, err := fmt.Fprintf(w, "%x %s\n", h.Sum([]byte{}), file); err != nil { + return err + } + } + return nil +} + +// DecodeAndVerify reads a SHA256SUMS-style text file and checks the results +// against the current sums for all the hashes. +func (hl *hashList) DecodeAndVerify(r io.Reader) error { + // Read the file and make sure everything in there has a matching hash. + seen := make(map[string]struct{}) + s := bufio.NewScanner(r) + for s.Scan() { + sha := make([]byte, sha256.Size) + var file string + if _, err := fmt.Sscanf(s.Text(), "%x %s", &sha, &file); err != nil { + return err + } + + h, ok := hl.hashes[file] + if !ok { + return fmt.Errorf("list missing hash for %q", file) + } + if !bytes.Equal(sha, h.Sum([]byte{})) { + return fmt.Errorf("hash check failed for %q", file) + } + seen[file] = struct{}{} + } + if err := s.Err(); err != nil { + return err + } + + // Make sure everything we had a hash for was seen. + for file, _ := range hl.hashes { + if _, ok := seen[file]; !ok { + return fmt.Errorf("file missing for %q", file) + } + } + + return nil +} + +// write takes a writer and creates an archive with the snapshot metadata, +// the snapshot itself, and adds some integrity checking information. +func write(out io.Writer, metadata *raft.SnapshotMeta, snap io.Reader) error { + // Start a new tarball. + now := time.Now() + archive := tar.NewWriter(out) + + // Create a hash list that we will use to write a SHA256SUMS file into + // the archive. + hl := newHashList() + + // Encode the snapshot metadata, which we need to feed back during a + // restore. + metaHash := hl.Add("meta.json") + var metaBuffer bytes.Buffer + enc := json.NewEncoder(&metaBuffer) + if err := enc.Encode(metadata); err != nil { + return fmt.Errorf("failed to encode snapshot metadata: %v", err) + } + if err := archive.WriteHeader(&tar.Header{ + Name: "meta.json", + Mode: 0600, + Size: int64(metaBuffer.Len()), + ModTime: now, + }); err != nil { + return fmt.Errorf("failed to write snapshot metadata header: %v", err) + } + if _, err := io.Copy(archive, io.TeeReader(&metaBuffer, metaHash)); err != nil { + return fmt.Errorf("failed to write snapshot metadata: %v", err) + } + + // Copy the snapshot data given the size from the metadata. + snapHash := hl.Add("state.bin") + if err := archive.WriteHeader(&tar.Header{ + Name: "state.bin", + Mode: 0600, + Size: metadata.Size, + ModTime: now, + }); err != nil { + return fmt.Errorf("failed to write snapshot data header: %v", err) + } + if _, err := io.CopyN(archive, io.TeeReader(snap, snapHash), metadata.Size); err != nil { + return fmt.Errorf("failed to write snapshot metadata: %v", err) + } + + // Create a SHA256SUMS file that we can use to verify on restore. + var shaBuffer bytes.Buffer + if err := hl.Encode(&shaBuffer); err != nil { + return fmt.Errorf("failed to encode snapshot hashes: %v", err) + } + if err := archive.WriteHeader(&tar.Header{ + Name: "SHA256SUMS", + Mode: 0600, + Size: int64(shaBuffer.Len()), + ModTime: now, + }); err != nil { + return fmt.Errorf("failed to write snapshot hashes header: %v", err) + } + if _, err := io.Copy(archive, &shaBuffer); err != nil { + return fmt.Errorf("failed to write snapshot metadata: %v", err) + } + + // Finalize the archive. + if err := archive.Close(); err != nil { + return fmt.Errorf("failed to finalize snapshot: %v", err) + } + + return nil +} + +// read takes a reader and extracts the snapshot metadata and the snapshot +// itself, and also checks the integrity of the data. You must arrange to call +// Close() on the returned object or else you will leak a temporary file. +func read(in io.Reader, metadata *raft.SnapshotMeta, snap io.Writer) error { + // Start a new tar reader. + archive := tar.NewReader(in) + + // Create a hash list that we will use to compare with the SHA256SUMS + // file in the archive. + hl := newHashList() + + // Populate the hashes for all the files we expect to see. The check at + // the end will make sure these are all present in the SHA256SUMS file + // and that the hashes match. + metaHash := hl.Add("meta.json") + snapHash := hl.Add("state.bin") + + // Look through the archive for the pieces we care about. + var shaBuffer bytes.Buffer + for { + hdr, err := archive.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed reading snapshot: %v", err) + } + + switch hdr.Name { + case "meta.json": + dec := json.NewDecoder(io.TeeReader(archive, metaHash)) + if err := dec.Decode(&metadata); err != nil { + return fmt.Errorf("failed to decode snapshot metadata: %v", err) + } + + case "state.bin": + if _, err := io.Copy(io.MultiWriter(snap, snapHash), archive); err != nil { + return fmt.Errorf("failed to read or write snapshot data: %v", err) + } + + case "SHA256SUMS": + if _, err := io.Copy(&shaBuffer, archive); err != nil { + return fmt.Errorf("failed to read snapshot hashes: %v", err) + } + + default: + return fmt.Errorf("unexpected file %q in snapshot", hdr.Name) + } + + } + + // Verify all the hashes. + if err := hl.DecodeAndVerify(&shaBuffer); err != nil { + return fmt.Errorf("failed checking integrity of snapshot: %v", err) + } + + return nil +} diff --git a/consul/snapshot/archive_test.go b/consul/snapshot/archive_test.go new file mode 100644 index 000000000..05fdd378a --- /dev/null +++ b/consul/snapshot/archive_test.go @@ -0,0 +1,134 @@ +package snapshot + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" + + "github.com/hashicorp/raft" +) + +func TestArchive(t *testing.T) { + // Create some fake snapshot data. + metadata := raft.SnapshotMeta{ + Index: 2005, + Term: 2011, + Configuration: raft.Configuration{ + Servers: []raft.Server{ + raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID("hello"), + Address: raft.ServerAddress("127.0.0.1:8300"), + }, + }, + }, + Size: 1024, + } + var snap bytes.Buffer + var expected bytes.Buffer + both := io.MultiWriter(&snap, &expected) + if _, err := io.Copy(both, io.LimitReader(rand.Reader, 1024)); err != nil { + t.Fatalf("err: %v", err) + } + + // Write out the snapshot. + var archive bytes.Buffer + if err := write(&archive, &metadata, &snap); err != nil { + t.Fatalf("err: %v", err) + } + + // Read the snapshot back. + var newMeta raft.SnapshotMeta + var newSnap bytes.Buffer + if err := read(&archive, &newMeta, &newSnap); err != nil { + t.Fatalf("err: %v", err) + } + + // Check the contents. + if !reflect.DeepEqual(newMeta, metadata) { + t.Fatalf("bad: %#v", newMeta) + } + var buf bytes.Buffer + if _, err := io.Copy(&buf, &newSnap); err != nil { + t.Fatalf("err: %v", err) + } + if !bytes.Equal(buf.Bytes(), expected.Bytes()) { + t.Fatalf("snapshot contents didn't match") + } +} + +func TestArchive_BadData(t *testing.T) { + cases := []struct { + Name string + Error string + }{ + {"../../test/snapshot/empty.tar", "failed checking integrity of snapshot"}, + {"../../test/snapshot/extra.tar", "unexpected file \"nope\""}, + {"../../test/snapshot/missing-meta.tar", "hash check failed for \"meta.json\""}, + {"../../test/snapshot/missing-state.tar", "hash check failed for \"state.bin\""}, + {"../../test/snapshot/missing-sha.tar", "file missing"}, + {"../../test/snapshot/corrupt-meta.tar", "hash check failed for \"meta.json\""}, + {"../../test/snapshot/corrupt-state.tar", "hash check failed for \"state.bin\""}, + {"../../test/snapshot/corrupt-sha.tar", "list missing hash for \"nope\""}, + } + for i, c := range cases { + f, err := os.Open(c.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + defer f.Close() + + var metadata raft.SnapshotMeta + err = read(f, &metadata, ioutil.Discard) + if err == nil || !strings.Contains(err.Error(), c.Error) { + t.Fatalf("case %d (%s): %v", i, c.Name, err) + } + } +} + +func TestArchive_hashList(t *testing.T) { + hl := newHashList() + for i := 0; i < 16; i++ { + h := hl.Add(fmt.Sprintf("file-%d", i)) + if _, err := io.CopyN(h, rand.Reader, 32); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Do a normal round trip. + var buf bytes.Buffer + if err := hl.Encode(&buf); err != nil { + t.Fatalf("err: %v", err) + } + if err := hl.DecodeAndVerify(&buf); err != nil { + t.Fatalf("err: %v", err) + } + + // Have a local hash that isn't in the file. + buf.Reset() + if err := hl.Encode(&buf); err != nil { + t.Fatalf("err: %v", err) + } + hl.Add("nope") + err := hl.DecodeAndVerify(&buf) + if err == nil || !strings.Contains(err.Error(), "file missing for \"nope\"") { + t.Fatalf("err: %v", err) + } + + // Have a hash in the file that we haven't seen locally. + buf.Reset() + if err := hl.Encode(&buf); err != nil { + t.Fatalf("err: %v", err) + } + delete(hl.hashes, "nope") + err = hl.DecodeAndVerify(&buf) + if err == nil || !strings.Contains(err.Error(), "list missing hash for \"nope\"") { + t.Fatalf("err: %v", err) + } +} diff --git a/consul/snapshot/snapshot.go b/consul/snapshot/snapshot.go new file mode 100644 index 000000000..6f7ee21c1 --- /dev/null +++ b/consul/snapshot/snapshot.go @@ -0,0 +1,193 @@ +// snapshot manages the interactions between Consul and Raft in order to take +// and restore snapshots for disaster recovery. The internal format of a +// snapshot is simply a tar file, as described in archive.go. +package snapshot + +import ( + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "time" + + "github.com/hashicorp/raft" +) + +// Snapshot is a structure that holds state about a temporary file that is used +// to hold a snapshot. By using an intermediate file we avoid holding everything +// in memory. +type Snapshot struct { + file *os.File + index uint64 +} + +// New takes a state snapshot of the given Raft instance into a temporary file +// and returns an object that gives access to the file as an io.Reader. You must +// arrange to call Close() on the returned object or else you will leak a +// temporary file. +func New(logger *log.Logger, r *raft.Raft) (*Snapshot, error) { + // Take the snapshot. + future := r.Snapshot() + if err := future.Error(); err != nil { + return nil, fmt.Errorf("Raft error when taking snapshot: %v", err) + } + + // Open up the snapshot. + metadata, snap, err := future.Open() + if err != nil { + return nil, fmt.Errorf("failed to open snapshot: %v:", err) + } + defer func() { + if err := snap.Close(); err != nil { + logger.Printf("[ERR] snapshot: Failed to close Raft snapshot: %v", err) + } + }() + + // Make a scratch file to receive the contents so that we don't buffer + // everything in memory. This gets deleted in Close() since we keep it + // around for re-reading. + archive, err := ioutil.TempFile("", "snapshot") + if err != nil { + return nil, fmt.Errorf("failed to create snapshot file: %v", err) + } + + // If anything goes wrong after this point, we will attempt to clean up + // the temp file. The happy path will disarm this. + var keep bool + defer func() { + if keep { + return + } + + if err := os.Remove(archive.Name()); err != nil { + logger.Printf("[ERR] snapshot: Failed to clean up temp snapshot: %v", err) + } + }() + + // Wrap the file writer in a gzip compressor. + compressor := gzip.NewWriter(archive) + + // Write the archive. + if err := write(compressor, metadata, snap); err != nil { + return nil, fmt.Errorf("failed to write snapshot file: %v", err) + } + + // Finish the compressed stream. + if err := compressor.Close(); err != nil { + return nil, fmt.Errorf("failed to compress snapshot file: %v", err) + } + + // Sync the compressed file and rewind it so it's ready to be streamed + // out by the caller. + if err := archive.Sync(); err != nil { + return nil, fmt.Errorf("failed to sync snapshot: %v", err) + } + if _, err := archive.Seek(0, 0); err != nil { + return nil, fmt.Errorf("failed to rewind snapshot: %v", err) + } + + keep = true + return &Snapshot{archive, metadata.Index}, nil +} + +// Index returns the index of the snapshot. This is safe to call on a nil +// snapshot, it will just return 0. +func (s *Snapshot) Index() uint64 { + if s == nil { + return 0 + } + return s.index +} + +// Read passes through to the underlying snapshot file. This is safe to call on +// a nil snapshot, it will just return an EOF. +func (s *Snapshot) Read(p []byte) (n int, err error) { + if s == nil { + return 0, io.EOF + } + return s.file.Read(p) +} + +// Close closes the snapshot and removes any temporary storage associated with +// it. You must arrange to call this whenever NewSnapshot() has been called +// successfully. This is safe to call on a nil snapshot. +func (s *Snapshot) Close() error { + if s == nil { + return nil + } + + if err := s.file.Close(); err != nil { + return err + } + return os.Remove(s.file.Name()) +} + +// Verify takes the snapshot from the reader and verifies its contents. +func Verify(in io.Reader) error { + // Wrap the reader in a gzip decompressor. + decomp, err := gzip.NewReader(in) + if err != nil { + return fmt.Errorf("failed to decompress snapshot: %v", err) + } + defer decomp.Close() + + // Read the archive, throwing away the snapshot data. + var metadata raft.SnapshotMeta + if err := read(decomp, &metadata, ioutil.Discard); err != nil { + return fmt.Errorf("failed to read snapshot file: %v", err) + } + return nil +} + +// Restore takes the snapshot from the reader and attempts to apply it to the +// given Raft instance. +func Restore(logger *log.Logger, in io.Reader, r *raft.Raft) error { + // Wrap the reader in a gzip decompressor. + decomp, err := gzip.NewReader(in) + if err != nil { + return fmt.Errorf("failed to decompress snapshot: %v", err) + } + defer func() { + if err := decomp.Close(); err != nil { + logger.Printf("[ERR] snapshot: Failed to close snapshot decompressor: %v", err) + } + }() + + // Make a scratch file to receive the contents of the snapshot data so + // we can avoid buffering in memory. + snap, err := ioutil.TempFile("", "snapshot") + if err != nil { + return fmt.Errorf("failed to create temp snapshot file: %v", err) + } + defer func() { + if err := snap.Close(); err != nil { + logger.Printf("[ERR] snapshot: Failed to close temp snapshot: %v", err) + } + if err := os.Remove(snap.Name()); err != nil { + logger.Printf("[ERR] snapshot: Failed to clean up temp snapshot: %v", err) + } + }() + + // Read the archive. + var metadata raft.SnapshotMeta + if err := read(decomp, &metadata, snap); err != nil { + return fmt.Errorf("failed to read snapshot file: %v", err) + } + + // Sync and rewind the file so it's ready to be read again. + if err := snap.Sync(); err != nil { + return fmt.Errorf("failed to sync temp snapshot: %v", err) + } + if _, err := snap.Seek(0, 0); err != nil { + return fmt.Errorf("failed to rewind temp snapshot: %v", err) + } + + // Feed the snapshot into Raft. + if err := r.Restore(&metadata, snap, 60*time.Second); err != nil { + return fmt.Errorf("Raft error when restoring snapshot: %v", err) + } + + return nil +} diff --git a/consul/snapshot/snapshot_test.go b/consul/snapshot/snapshot_test.go new file mode 100644 index 000000000..406c29a1e --- /dev/null +++ b/consul/snapshot/snapshot_test.go @@ -0,0 +1,297 @@ +package snapshot + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path" + "strings" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/raft" +) + +// MockFSM is a simple FSM for testing that simply stores its logs in a slice of +// byte slices. +type MockFSM struct { + sync.Mutex + logs [][]byte +} + +// MockSnapshot is a snapshot sink for testing that encodes the contents of a +// MockFSM using msgpack. +type MockSnapshot struct { + logs [][]byte + maxIndex int +} + +// See raft.FSM. +func (m *MockFSM) Apply(log *raft.Log) interface{} { + m.Lock() + defer m.Unlock() + m.logs = append(m.logs, log.Data) + return len(m.logs) +} + +// See raft.FSM. +func (m *MockFSM) Snapshot() (raft.FSMSnapshot, error) { + m.Lock() + defer m.Unlock() + return &MockSnapshot{m.logs, len(m.logs)}, nil +} + +// See raft.FSM. +func (m *MockFSM) Restore(in io.ReadCloser) error { + m.Lock() + defer m.Unlock() + defer in.Close() + hd := codec.MsgpackHandle{} + dec := codec.NewDecoder(in, &hd) + + m.logs = nil + return dec.Decode(&m.logs) +} + +// See raft.SnapshotSink. +func (m *MockSnapshot) Persist(sink raft.SnapshotSink) error { + hd := codec.MsgpackHandle{} + enc := codec.NewEncoder(sink, &hd) + if err := enc.Encode(m.logs[:m.maxIndex]); err != nil { + sink.Cancel() + return err + } + sink.Close() + return nil +} + +// See raft.SnapshotSink. +func (m *MockSnapshot) Release() { +} + +// makeRaft returns a Raft and its FSM, with snapshots based in the given dir. +func makeRaft(t *testing.T, dir string) (*raft.Raft, *MockFSM) { + snaps, err := raft.NewFileSnapshotStore(dir, 5, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + fsm := &MockFSM{} + store := raft.NewInmemStore() + addr, trans := raft.NewInmemTransport("") + + config := raft.DefaultConfig() + config.LocalID = raft.ServerID(fmt.Sprintf("server-%s", addr)) + + var members raft.Configuration + members.Servers = append(members.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: config.LocalID, + Address: addr, + }) + + err = raft.BootstrapCluster(config, store, store, snaps, trans, members) + if err != nil { + t.Fatalf("err: %v", err) + } + + raft, err := raft.NewRaft(config, fsm, store, store, snaps, trans) + if err != nil { + t.Fatalf("err: %v", err) + } + + timeout := time.After(10 * time.Second) + for { + if raft.Leader() != "" { + break + } + + select { + case <-raft.LeaderCh(): + case <-time.After(1 * time.Second): + // Need to poll because we might have missed the first + // go with the leader channel. + case <-timeout: + t.Fatalf("timed out waiting for leader") + } + } + + return raft, fsm +} + +func TestSnapshot(t *testing.T) { + dir, err := ioutil.TempDir("", "snapshot") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + // Make a Raft and populate it with some data. We tee everything we + // apply off to a buffer for checking post-snapshot. + var expected []bytes.Buffer + before, _ := makeRaft(t, path.Join(dir, "before")) + defer before.Shutdown() + for i := 0; i < 64*1024; i++ { + var log bytes.Buffer + var copy bytes.Buffer + both := io.MultiWriter(&log, ©) + if _, err := io.CopyN(both, rand.Reader, 256); err != nil { + t.Fatalf("err: %v", err) + } + future := before.Apply(log.Bytes(), time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + expected = append(expected, copy) + } + + // Take a snapshot. + logger := log.New(os.Stdout, "", 0) + snap, err := New(logger, before) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // Verify the snapshot. We have to rewind it after for the restore. + if err := Verify(snap); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := snap.file.Seek(0, 0); err != nil { + t.Fatalf("err: %v", err) + } + + // Make a new, independent Raft. + after, fsm := makeRaft(t, path.Join(dir, "after")) + defer after.Shutdown() + + // Put some initial data in there that the snapshot should overwrite. + for i := 0; i < 16; i++ { + var log bytes.Buffer + if _, err := io.CopyN(&log, rand.Reader, 256); err != nil { + t.Fatalf("err: %v", err) + } + future := after.Apply(log.Bytes(), time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Restore the snapshot. + if err := Restore(logger, snap, after); err != nil { + t.Fatalf("err: %v", err) + } + + // Compare the contents. + fsm.Lock() + defer fsm.Unlock() + if len(fsm.logs) != len(expected) { + t.Fatalf("bad: %d vs. %d", len(fsm.logs), len(expected)) + } + for i, _ := range fsm.logs { + if !bytes.Equal(fsm.logs[i], expected[i].Bytes()) { + t.Fatalf("bad: log %d doesn't match", i) + } + } +} + +func TestSnapshot_Nil(t *testing.T) { + var snap *Snapshot + + if idx := snap.Index(); idx != 0 { + t.Fatalf("bad: %d", idx) + } + + n, err := snap.Read(make([]byte, 16)) + if n != 0 || err != io.EOF { + t.Fatalf("bad: %d %v", n, err) + } + + if err := snap.Close(); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestSnapshot_BadVerify(t *testing.T) { + buf := bytes.NewBuffer([]byte("nope")) + err := Verify(buf) + if err == nil || !strings.Contains(err.Error(), "unexpected EOF") { + t.Fatalf("err: %v", err) + } +} + +func TestSnapshot_BadRestore(t *testing.T) { + dir, err := ioutil.TempDir("", "snapshot") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + // Make a Raft and populate it with some data. + before, _ := makeRaft(t, path.Join(dir, "before")) + defer before.Shutdown() + for i := 0; i < 16*1024; i++ { + var log bytes.Buffer + if _, err := io.CopyN(&log, rand.Reader, 256); err != nil { + t.Fatalf("err: %v", err) + } + future := before.Apply(log.Bytes(), time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Take a snapshot. + logger := log.New(os.Stdout, "", 0) + snap, err := New(logger, before) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make a new, independent Raft. + after, fsm := makeRaft(t, path.Join(dir, "after")) + defer after.Shutdown() + + // Put some initial data in there that should not be harmed by the + // failed restore attempt. + var expected []bytes.Buffer + for i := 0; i < 16; i++ { + var log bytes.Buffer + var copy bytes.Buffer + both := io.MultiWriter(&log, ©) + if _, err := io.CopyN(both, rand.Reader, 256); err != nil { + t.Fatalf("err: %v", err) + } + future := after.Apply(log.Bytes(), time.Second) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + expected = append(expected, copy) + } + + // Attempt to restore a truncated version of the snapshot. This is + // expected to fail. + err = Restore(logger, io.LimitReader(snap, 512), after) + if err == nil || !strings.Contains(err.Error(), "unexpected EOF") { + t.Fatalf("err: %v", err) + } + + // Compare the contents to make sure the aborted restore didn't harm + // anything. + fsm.Lock() + defer fsm.Unlock() + if len(fsm.logs) != len(expected) { + t.Fatalf("bad: %d vs. %d", len(fsm.logs), len(expected)) + } + for i, _ := range fsm.logs { + if !bytes.Equal(fsm.logs[i], expected[i].Bytes()) { + t.Fatalf("bad: log %d doesn't match", i) + } + } +} diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go new file mode 100644 index 000000000..1a34d393d --- /dev/null +++ b/consul/snapshot_endpoint.go @@ -0,0 +1,224 @@ +// The snapshot endpoint is a special non-RPC endpoint that supports streaming +// for taking and restoring snapshots for disaster recovery. This gets wired +// directly into Consul's stream handler, and a new TCP connection is made for +// each request. +// +// This also includes a SnapshotRPC() function, which acts as a lightweight +// client that knows the details of the stream protocol. +package consul + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + + "github.com/hashicorp/consul/consul/snapshot" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-msgpack/codec" +) + +const ( + // noSnapshotsInDevMode indicates that snapshots aren't available for a + // server in dev mode. + noSnapshotsInDevMode = "Snapshot operations not available in dev mode" +) + +// dispatchSnapshotRequest takes an incoming request structure with possibly some +// streaming data (for a restore) and returns possibly some streaming data (for +// a snapshot save). We can't use the normal RPC mechanism in a streaming manner +// like this, so we have to dispatch these by hand. +func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Reader, + reply *structs.SnapshotResponse) (io.ReadCloser, error) { + + // Perform DC forwarding. + if dc := args.Datacenter; dc != s.config.Datacenter { + server, ok := s.getRemoteServer(dc) + if !ok { + return nil, structs.ErrNoDCPath + } + return SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply) + } + + // Perform leader forwarding if required. + if !args.AllowStale { + if isLeader, server := s.getLeader(); !isLeader { + if server == nil { + return nil, structs.ErrNoLeader + } + return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply) + } + } + + // Snapshots don't work in dev mode because we need Raft's snapshots to + // be readable. Better to present a clear error than one from deep down + // in the Raft snapshot store. + if s.config.DevMode { + return nil, errors.New(noSnapshotsInDevMode) + } + + // Verify token is allowed to operate on snapshots. There's only a + // single ACL sense here (not read and write) since reading gets you + // all the ACLs and you could escalate from there. + if acl, err := s.resolveToken(args.Token); err != nil { + return nil, err + } else if acl != nil && !acl.Snapshot() { + return nil, permissionDeniedErr + } + + // Dispatch the operation. + switch args.Op { + case structs.SnapshotSave: + if !args.AllowStale { + if err := s.consistentRead(); err != nil { + return nil, err + } + } + + // Set the metadata here before we do anything; this should always be + // pessimistic if we get more data while the snapshot is being taken. + s.setQueryMeta(&reply.QueryMeta) + + // Take the snapshot and capture the index. + snap, err := snapshot.New(s.logger, s.raft) + reply.Index = snap.Index() + return snap, err + + case structs.SnapshotRestore: + if args.AllowStale { + return nil, fmt.Errorf("stale not allowed for restore") + } + + // Restore the snapshot. + if err := snapshot.Restore(s.logger, in, s.raft); err != nil { + return nil, err + } + + // Run a barrier so we are sure that our FSM is caught up with + // any snapshot restore details (it's also part of Raft's restore + // process but we don't want to depend on that detail for this to + // be correct). Once that works, we can redo the leader actions + // so our leader-maintained state will be up to date. + barrier := s.raft.Barrier(0) + if err := barrier.Error(); err != nil { + return nil, err + } + if err := s.revokeLeadership(); err != nil { + return nil, err + } + if err := s.establishLeadership(); err != nil { + return nil, err + } + + // Give the caller back an empty reader since there's nothing to + // stream back. + return ioutil.NopCloser(bytes.NewReader([]byte(""))), nil + + default: + return nil, fmt.Errorf("unrecognized snapshot op %q", args.Op) + } +} + +// handleSnapshotRequest reads the request from the conn and dispatches it. This +// will be called from a goroutine after an incoming stream is determined to be +// a snapshot request. +func (s *Server) handleSnapshotRequest(conn net.Conn) error { + var args structs.SnapshotRequest + dec := codec.NewDecoder(conn, &codec.MsgpackHandle{}) + if err := dec.Decode(&args); err != nil { + return fmt.Errorf("failed to decode request: %v", err) + } + + var reply structs.SnapshotResponse + snap, err := s.dispatchSnapshotRequest(&args, conn, &reply) + if err != nil { + reply.Error = err.Error() + goto RESPOND + } + defer func() { + if err := snap.Close(); err != nil { + s.logger.Printf("[ERR] consul: Failed to close snapshot: %v", err) + } + }() + +RESPOND: + enc := codec.NewEncoder(conn, &codec.MsgpackHandle{}) + if err := enc.Encode(&reply); err != nil { + return fmt.Errorf("failed to encode response: %v", err) + } + if snap != nil { + if _, err := io.Copy(conn, snap); err != nil { + return fmt.Errorf("failed to stream snapshot: %v", err) + } + } + + return nil +} + +// SnapshotRPC is a streaming client function for performing a snapshot RPC +// request to a remote server. It will create a fresh connection for each +// request, send the request header, and then stream in any data from the +// reader (for a restore). It will then parse the received response header, and +// if there's no error will return an io.ReadCloser (that you must close) with +// the streaming output (for a snapshot). If the reply contains an error, this +// will always return an error as well, so you don't need to check the error +// inside the filled-in reply. +func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, + args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) { + + conn, hc, err := pool.Dial(dc, addr) + if err != nil { + return nil, err + } + + // keep will disarm the defer on success if we are returning the caller + // our connection to stream the output. + var keep bool + defer func() { + if !keep { + conn.Close() + } + }() + + // Write the snapshot RPC byte to set the mode, then perform the + // request. + if _, err := conn.Write([]byte{byte(rpcSnapshot)}); err != nil { + return nil, fmt.Errorf("failed to write stream type: %v", err) + } + + // Push the header encoded as msgpack, then stream the input. + enc := codec.NewEncoder(conn, &codec.MsgpackHandle{}) + if err := enc.Encode(&args); err != nil { + return nil, fmt.Errorf("failed to encode request: %v", err) + } + if _, err := io.Copy(conn, in); err != nil { + return nil, fmt.Errorf("failed to copy snapshot in: %v", err) + } + + // Our RPC protocol requires support for a half-close in order to signal + // the other side that they are done reading the stream, since we don't + // know the size in advance. This saves us from having to buffer just to + // calculate the size. + if hc != nil { + if err := hc.CloseWrite(); err != nil { + return nil, fmt.Errorf("failed to half close snapshot connection: %v", err) + } + } else { + return nil, fmt.Errorf("snapshot connection requires half-close support") + } + + // Pull the header decoded as msgpack. The caller can continue to read + // the conn to stream the remaining data. + dec := codec.NewDecoder(conn, &codec.MsgpackHandle{}) + if err := dec.Decode(reply); err != nil { + return nil, fmt.Errorf("failed to decode response: %v", err) + } + if reply.Error != "" { + return nil, errors.New(reply.Error) + } + + keep = true + return conn, nil +} diff --git a/consul/snapshot_endpoint_test.go b/consul/snapshot_endpoint_test.go new file mode 100644 index 000000000..1723b8fb5 --- /dev/null +++ b/consul/snapshot_endpoint_test.go @@ -0,0 +1,416 @@ +package consul + +import ( + "bytes" + "fmt" + "os" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" +) + +// verifySnapshot is a helper that does a snapshot and restore. +func verifySnapshot(t *testing.T, s *Server, dc, token string) { + codec := rpcClient(t, s) + defer codec.Close() + + // Set a key to a before value. + { + args := structs.KVSRequest{ + Datacenter: dc, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Value: []byte("hello"), + }, + WriteRequest: structs.WriteRequest{ + Token: token, + }, + } + var out bool + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Take a snapshot. + args := structs.SnapshotRequest{ + Datacenter: dc, + Token: token, + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // Read back the before value. + { + getR := structs.KeyRequest{ + Datacenter: dc, + Key: "test", + QueryOptions: structs.QueryOptions{ + Token: token, + }, + } + var dirent structs.IndexedDirEntries + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if string(d.Value) != "hello" { + t.Fatalf("bad: %v", d) + } + } + + // Set a key to an after value. + { + args := structs.KVSRequest{ + Datacenter: dc, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Value: []byte("goodbye"), + }, + WriteRequest: structs.WriteRequest{ + Token: token, + }, + } + var out bool + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Read back the before value. + { + getR := structs.KeyRequest{ + Datacenter: dc, + Key: "test", + QueryOptions: structs.QueryOptions{ + Token: token, + }, + } + var dirent structs.IndexedDirEntries + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if string(d.Value) != "goodbye" { + t.Fatalf("bad: %v", d) + } + } + + // Restore the snapshot. + args.Op = structs.SnapshotRestore + restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, + &args, snap, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + defer restore.Close() + + // Read back the before value post-snapshot. + { + getR := structs.KeyRequest{ + Datacenter: dc, + Key: "test", + QueryOptions: structs.QueryOptions{ + Token: token, + }, + } + var dirent structs.IndexedDirEntries + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if string(d.Value) != "hello" { + t.Fatalf("bad: %v", d) + } + } +} + +func TestSnapshot(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + verifySnapshot(t, s1, "dc1", "") +} + +func TestSnapshot_LeaderState(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // Make a before session. + var before string + { + args := structs.SessionRequest{ + Datacenter: s1.config.Datacenter, + Op: structs.SessionCreate, + Session: structs.Session{ + Node: s1.config.NodeName, + TTL: "60s", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &before); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Take a snapshot. + args := structs.SnapshotRequest{ + Datacenter: s1.config.Datacenter, + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Close() + + // Make an after session. + var after string + { + args := structs.SessionRequest{ + Datacenter: s1.config.Datacenter, + Op: structs.SessionCreate, + Session: structs.Session{ + Node: s1.config.NodeName, + TTL: "60s", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &after); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Make sure the leader has timers setup. + if _, ok := s1.sessionTimers[before]; !ok { + t.Fatalf("missing session timer") + } + if _, ok := s1.sessionTimers[after]; !ok { + t.Fatalf("missing session timer") + } + + // Restore the snapshot. + args.Op = structs.SnapshotRestore + restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, + &args, snap, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + defer restore.Close() + + // Make sure the before time is still there, and that the after timer + // got reverted. This proves we fully cycled the leader state. + if _, ok := s1.sessionTimers[before]; !ok { + t.Fatalf("missing session timer") + } + if _, ok := s1.sessionTimers[after]; ok { + t.Fatalf("unexpected session timer") + } +} + +func TestSnapshot_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Take a snapshot. + func() { + args := structs.SnapshotRequest{ + Datacenter: "dc1", + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + }() + + // Restore a snapshot. + func() { + args := structs.SnapshotRequest{ + Datacenter: "dc1", + Op: structs.SnapshotRestore, + } + var reply structs.SnapshotResponse + _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + }() + + // With the token in place everything should go through. + verifySnapshot(t, s1, "dc1", "root") +} + +func TestSnapshot_Forward_Leader(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc1") + + // Run against the leader and the follower to ensure we forward. + for _, s := range []*Server{s1, s2} { + verifySnapshot(t, s, "dc1", "") + verifySnapshot(t, s, "dc1", "") + } +} + +func TestSnapshot_Forward_Datacenter(t *testing.T) { + dir1, s1 := testServerDC(t, "dc1") + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDC(t, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc2") + + // Try to WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(s1.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Run a snapshot from each server locally and remotely to ensure we + // forward. + for _, s := range []*Server{s1, s2} { + verifySnapshot(t, s, "dc1", "") + verifySnapshot(t, s, "dc2", "") + } +} + +func TestSnapshot_AllowStale(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Run against the servers which aren't haven't been set up to establish + // a leader and make sure we get a no leader error. + for _, s := range []*Server{s1, s2} { + // Take a snapshot. + args := structs.SnapshotRequest{ + Datacenter: s.config.Datacenter, + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) { + t.Fatalf("err: %v", err) + } + } + + // Run in stale mode and make sure we get an error from Raft (snapshot + // was attempted), and not a no leader error. + for _, s := range []*Server{s1, s2} { + // Take a snapshot. + args := structs.SnapshotRequest{ + Datacenter: s.config.Datacenter, + AllowStale: true, + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") { + t.Fatalf("err: %v", err) + } + } +} + +func TestSnapshot_DevMode(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + args := structs.SnapshotRequest{ + Datacenter: s1.config.Datacenter, + Op: structs.SnapshotSave, + } + var reply structs.SnapshotResponse + _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, + &args, bytes.NewReader([]byte("")), &reply) + if err == nil || !strings.Contains(err.Error(), noSnapshotsInDevMode) { + t.Fatalf("err: %v", err) + } +} diff --git a/consul/structs/snapshot.go b/consul/structs/snapshot.go new file mode 100644 index 000000000..3d65e317f --- /dev/null +++ b/consul/structs/snapshot.go @@ -0,0 +1,40 @@ +package structs + +type SnapshotOp int + +const ( + SnapshotSave SnapshotOp = iota + SnapshotRestore +) + +// SnapshotRequest is used as a header for a snapshot RPC request. This will +// precede any streaming data that's part of the request and is JSON-encoded on +// the wire. +type SnapshotRequest struct { + // Datacenter is the target datacenter for this request. The request + // will be forwarded if necessary. + Datacenter string + + // Token is the ACL token to use for the operation. If ACLs are enabled + // then all operations require a management token. + Token string + + // If set, any follower can service the request. Results may be + // arbitrarily stale. Only applies to SnapshotSave. + AllowStale bool + + // Op is the operation code for the RPC. + Op SnapshotOp +} + +// SnapshotResponse is used header for a snapshot RPC response. This will +// precede any streaming data that's part of the request and is JSON-encoded on +// the wire. +type SnapshotResponse struct { + // Error is the overall error status of the RPC request. + Error string + + // QueryMeta has freshness information about the server that handled the + // request. It is only filled in for a SnapshotSave. + QueryMeta +} diff --git a/test/snapshot/corrupt-meta.tar b/test/snapshot/corrupt-meta.tar new file mode 100644 index 000000000..a3ab6f317 --- /dev/null +++ b/test/snapshot/corrupt-meta.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dedde68c81125318efc1293c933ddca7f3720c2e7c94c54b56e50efcbecd94c0 +size 4608 diff --git a/test/snapshot/corrupt-sha.tar b/test/snapshot/corrupt-sha.tar new file mode 100644 index 000000000..563074adc --- /dev/null +++ b/test/snapshot/corrupt-sha.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c9a3f99934859f8f8bce97344aeb0a012f24d5778920a7e8ad7ebc28d9981350 +size 4608 diff --git a/test/snapshot/corrupt-state.tar b/test/snapshot/corrupt-state.tar new file mode 100644 index 000000000..384444537 --- /dev/null +++ b/test/snapshot/corrupt-state.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e4a4b7d00f150e1045a6a04e29977d20a7d926cb355d1ec06c18f3f3f6dd3663 +size 4608 diff --git a/test/snapshot/empty.tar b/test/snapshot/empty.tar new file mode 100644 index 000000000..abc8240e4 --- /dev/null +++ b/test/snapshot/empty.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5f70bf18a086007016e948b04aed3b82103a36bea41755b6cddfaf10ace3c6ef +size 1024 diff --git a/test/snapshot/extra.tar b/test/snapshot/extra.tar new file mode 100644 index 000000000..4a86723eb --- /dev/null +++ b/test/snapshot/extra.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:50c3e729939fab88b174002a401fa4872f6795fa8b572699a931b218a37c4a03 +size 5120 diff --git a/test/snapshot/missing-meta.tar b/test/snapshot/missing-meta.tar new file mode 100644 index 000000000..247cd63c2 --- /dev/null +++ b/test/snapshot/missing-meta.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:35508736d46046a6aee4fe7dd9d702ac2a5195d049a50eac7489e18378085ab3 +size 3584 diff --git a/test/snapshot/missing-sha.tar b/test/snapshot/missing-sha.tar new file mode 100644 index 000000000..71eff70a0 --- /dev/null +++ b/test/snapshot/missing-sha.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:af1eb4be4f820c0fccf16fc0031c19583b70e7c250eac6180b83393cd8cbd5d9 +size 3584 diff --git a/test/snapshot/missing-state.tar b/test/snapshot/missing-state.tar new file mode 100644 index 000000000..e35c026c5 --- /dev/null +++ b/test/snapshot/missing-state.tar @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f646701c86739c3cb2aee2697189e4e0a57683b360c4f59f2332b1bb7bc251f7 +size 3072 diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index 61499c507..49f829923 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -1,10 +1,10 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) test: - go test -timeout=45s ./... + go test -timeout=60s ./... integ: test - INTEG_TESTS=yes go test -timeout=3s -run=Integ ./... + INTEG_TESTS=yes go test -timeout=5s -run=Integ ./... deps: go get -d -v ./... diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index ff14131c4..2fd78e784 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -3,6 +3,7 @@ package raft import ( "errors" "fmt" + "io" "log" "os" "strconv" @@ -25,6 +26,10 @@ var ( // because it's been deposed in the process. ErrLeadershipLost = errors.New("leadership lost while committing log") + // ErrAbortedByRestore is returned when a leader fails to commit a log + // entry because it's been superseded by a user snapshot restore. + ErrAbortedByRestore = errors.New("snapshot restored while committing log") + // ErrRaftShutdown is returned when operations are requested against an // inactive Raft. ErrRaftShutdown = errors.New("raft is already shutdown") @@ -64,11 +69,14 @@ type Raft struct { // FSM is the client state machine to apply commands to fsm FSM - // fsmCommitCh is used to trigger async application of logs to the fsm - fsmCommitCh chan commitTuple - - // fsmRestoreCh is used to trigger a restore from snapshot - fsmRestoreCh chan *restoreFuture + // fsmMutateCh is used to send state-changing updates to the FSM. This + // receives pointers to commitTuple structures when applying logs or + // pointers to restoreFuture structures when restoring a snapshot. We + // need control over the order of these operations when doing user + // restores so that we finish applying any old log applies before we + // take a user snapshot on the leader, otherwise we might restore the + // snapshot and apply old logs to it that were in the pipe. + fsmMutateCh chan interface{} // fsmSnapshotCh is used to trigger a new snapshot being taken fsmSnapshotCh chan *reqSnapshotFuture @@ -118,8 +126,12 @@ type Raft struct { // snapshots is used to store and retrieve snapshots snapshots SnapshotStore - // snapshotCh is used for user triggered snapshots - snapshotCh chan *snapshotFuture + // userSnapshotCh is used for user-triggered snapshots + userSnapshotCh chan *userSnapshotFuture + + // userRestoreCh is used for user-triggered restores of external + // snapshots + userRestoreCh chan *userRestoreFuture // stable is a StableStore implementation for durable state // It provides stable storage for many fields in raftState @@ -429,8 +441,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna applyCh: make(chan *logFuture), conf: *conf, fsm: fsm, - fsmCommitCh: make(chan commitTuple, 128), - fsmRestoreCh: make(chan *restoreFuture), + fsmMutateCh: make(chan interface{}, 128), fsmSnapshotCh: make(chan *reqSnapshotFuture), leaderCh: make(chan bool), localID: localID, @@ -441,7 +452,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna configurations: configurations{}, rpcCh: trans.Consumer(), snapshots: snaps, - snapshotCh: make(chan *snapshotFuture), + userSnapshotCh: make(chan *userSnapshotFuture), + userRestoreCh: make(chan *userRestoreFuture), shutdownCh: make(chan struct{}), stable: stable, trans: trans, @@ -792,18 +804,78 @@ func (r *Raft) Shutdown() Future { return &shutdownFuture{nil} } -// Snapshot is used to manually force Raft to take a snapshot. -// Returns a future that can be used to block until complete. -func (r *Raft) Snapshot() Future { - snapFuture := &snapshotFuture{} - snapFuture.init() +// Snapshot is used to manually force Raft to take a snapshot. Returns a future +// that can be used to block until complete, and that contains a function that +// can be used to open the snapshot. +func (r *Raft) Snapshot() SnapshotFuture { + future := &userSnapshotFuture{} + future.init() select { - case r.snapshotCh <- snapFuture: - return snapFuture + case r.userSnapshotCh <- future: + return future case <-r.shutdownCh: - return errorFuture{ErrRaftShutdown} + future.respond(ErrRaftShutdown) + return future + } +} + +// Restore is used to manually force Raft to consume an external snapshot, such +// as if restoring from a backup. We will use the current Raft configuration, +// not the one from the snapshot, so that we can restore into a new cluster. We +// will also use the higher of the index of the snapshot, or the current index, +// and then add 1 to that, so we force a new state with a hole in the Raft log, +// so that the snapshot will be sent to followers and used for any new joiners. +// This can only be run on the leader, and blocks until the restore is complete +// or an error occurs. +// +// WARNING! This operation has the leader take on the state of the snapshot and +// then sets itself up so that it replicates that to its followers though the +// install snapshot process. This involves a potentially dangerous period where +// the leader commits ahead of its followers, so should only be used for disaster +// recovery into a fresh cluster, and should not be used in normal operations. +func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error { + metrics.IncrCounter([]string{"raft", "restore"}, 1) + var timer <-chan time.Time + if timeout > 0 { + timer = time.After(timeout) } + // Perform the restore. + restore := &userRestoreFuture{ + meta: meta, + reader: reader, + } + restore.init() + select { + case <-timer: + return ErrEnqueueTimeout + case <-r.shutdownCh: + return ErrRaftShutdown + case r.userRestoreCh <- restore: + // If the restore is ingested then wait for it to complete. + if err := restore.Error(); err != nil { + return err + } + } + + // Apply a no-op log entry. Waiting for this allows us to wait until the + // followers have gotten the restore and replicated at least this new + // entry, which shows that we've also faulted and installed the + // snapshot with the contents of the restore. + noop := &logFuture{ + log: Log{ + Type: LogNoop, + }, + } + noop.init() + select { + case <-timer: + return ErrEnqueueTimeout + case <-r.shutdownCh: + return ErrRaftShutdown + case r.applyCh <- noop: + return noop.Error() + } } // State is used to return the current raft state. @@ -870,7 +942,7 @@ func (r *Raft) Stats() map[string]string { "last_log_term": toString(lastLogTerm), "commit_index": toString(r.getCommitIndex()), "applied_index": toString(r.getLastApplied()), - "fsm_pending": toString(uint64(len(r.fsmCommitCh))), + "fsm_pending": toString(uint64(len(r.fsmMutateCh))), "last_snapshot_index": toString(lastSnapIndex), "last_snapshot_term": toString(lastSnapTerm), "protocol_version": toString(uint64(r.protocolVersion)), diff --git a/vendor/github.com/hashicorp/raft/fsm.go b/vendor/github.com/hashicorp/raft/fsm.go index 23da1e99b..c89986c0f 100644 --- a/vendor/github.com/hashicorp/raft/fsm.go +++ b/vendor/github.com/hashicorp/raft/fsm.go @@ -48,67 +48,87 @@ type FSMSnapshot interface { // the FSM to block our internal operations. func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 + + commit := func(req *commitTuple) { + // Apply the log if a command + var resp interface{} + if req.log.Type == LogCommand { + start := time.Now() + resp = r.fsm.Apply(req.log) + metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) + } + + // Update the indexes + lastIndex = req.log.Index + lastTerm = req.log.Term + + // Invoke the future if given + if req.future != nil { + req.future.response = resp + req.future.respond(nil) + } + } + + restore := func(req *restoreFuture) { + // Open the snapshot + meta, source, err := r.snapshots.Open(req.ID) + if err != nil { + req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) + return + } + + // Attempt to restore + start := time.Now() + if err := r.fsm.Restore(source); err != nil { + req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) + source.Close() + return + } + source.Close() + metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) + + // Update the last index and term + lastIndex = meta.Index + lastTerm = meta.Term + req.respond(nil) + } + + snapshot := func(req *reqSnapshotFuture) { + // Is there something to snapshot? + if lastIndex == 0 { + req.respond(ErrNothingNewToSnapshot) + return + } + + // Start a snapshot + start := time.Now() + snap, err := r.fsm.Snapshot() + metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) + + // Respond to the request + req.index = lastIndex + req.term = lastTerm + req.snapshot = snap + req.respond(err) + } + for { select { - case req := <-r.fsmRestoreCh: - // Open the snapshot - meta, source, err := r.snapshots.Open(req.ID) - if err != nil { - req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) - continue - } + case ptr := <-r.fsmMutateCh: + switch req := ptr.(type) { + case *commitTuple: + commit(req) - // Attempt to restore - start := time.Now() - if err := r.fsm.Restore(source); err != nil { - req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) - source.Close() - continue - } - source.Close() - metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) + case *restoreFuture: + restore(req) - // Update the last index and term - lastIndex = meta.Index - lastTerm = meta.Term - req.respond(nil) + default: + panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr)) + } case req := <-r.fsmSnapshotCh: - // Is there something to snapshot? - if lastIndex == 0 { - req.respond(ErrNothingNewToSnapshot) - continue - } + snapshot(req) - // Start a snapshot - start := time.Now() - snap, err := r.fsm.Snapshot() - metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) - - // Respond to the request - req.index = lastIndex - req.term = lastTerm - req.snapshot = snap - req.respond(err) - - case commitEntry := <-r.fsmCommitCh: - // Apply the log if a command - var resp interface{} - if commitEntry.log.Type == LogCommand { - start := time.Now() - resp = r.fsm.Apply(commitEntry.log) - metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) - } - - // Update the indexes - lastIndex = commitEntry.log.Index - lastTerm = commitEntry.log.Term - - // Invoke the future if given - if commitEntry.future != nil { - commitEntry.future.response = resp - commitEntry.future.respond(nil) - } case <-r.shutdownCh: return } diff --git a/vendor/github.com/hashicorp/raft/future.go b/vendor/github.com/hashicorp/raft/future.go index 67c74fc42..fac59a5cc 100644 --- a/vendor/github.com/hashicorp/raft/future.go +++ b/vendor/github.com/hashicorp/raft/future.go @@ -1,6 +1,8 @@ package raft import ( + "fmt" + "io" "sync" "time" ) @@ -46,6 +48,16 @@ type ConfigurationFuture interface { Configuration() Configuration } +// SnapshotFuture is used for waiting on a user-triggered snapshot to complete. +type SnapshotFuture interface { + Future + + // Open is a function you can call to access the underlying snapshot and + // its metadata. This must not be called until after the Error method + // has returned. + Open() (*SnapshotMeta, io.ReadCloser, error) +} + // errorFuture is used to return a static error. type errorFuture struct { err error @@ -150,9 +162,41 @@ func (s *shutdownFuture) Error() error { return nil } -// snapshotFuture is used for waiting on a snapshot to complete. -type snapshotFuture struct { +// userSnapshotFuture is used for waiting on a user-triggered snapshot to +// complete. +type userSnapshotFuture struct { deferError + + // opener is a function used to open the snapshot. This is filled in + // once the future returns with no error. + opener func() (*SnapshotMeta, io.ReadCloser, error) +} + +// Open is a function you can call to access the underlying snapshot and its +// metadata. +func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { + if u.opener == nil { + return nil, nil, fmt.Errorf("no snapshot available") + } else { + // Invalidate the opener so it can't get called multiple times, + // which isn't generally safe. + defer func() { + u.opener = nil + }() + return u.opener() + } +} + +// userRestoreFuture is used for waiting on a user-triggered restore of an +// external snapshot to complete. +type userRestoreFuture struct { + deferError + + // meta is the metadata that belongs with the snapshot. + meta *SnapshotMeta + + // reader is the interface to read the snapshot contents from. + reader io.Reader } // reqSnapshotFuture is used for requesting a snapshot start. diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index a6c729413..aa8fe8208 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -160,6 +160,10 @@ func (r *Raft) runFollower() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case r := <-r.userRestoreCh: + // Reject any restores since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -283,6 +287,10 @@ func (r *Raft) runCandidate() { // Reject any operations since we are not the leader v.respond(ErrNotLeader) + case r := <-r.userRestoreCh: + // Reject any restores since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -550,6 +558,10 @@ func (r *Raft) leaderLoop() { v.respond(nil) } + case future := <-r.userRestoreCh: + err := r.restoreUserSnapshot(future.meta, future.reader) + future.respond(err) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -680,6 +692,102 @@ func (r *Raft) quorumSize() int { return voters/2 + 1 } +// restoreUserSnapshot is used to manually consume an external snapshot, such +// as if restoring from a backup. We will use the current Raft configuration, +// not the one from the snapshot, so that we can restore into a new cluster. We +// will also use the higher of the index of the snapshot, or the current index, +// and then add 1 to that, so we force a new state with a hole in the Raft log, +// so that the snapshot will be sent to followers and used for any new joiners. +// This can only be run on the leader, and returns a future that can be used to +// block until complete. +func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { + defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now()) + + // Sanity check the version. + version := meta.Version + if version < SnapshotVersionMin || version > SnapshotVersionMax { + return fmt.Errorf("unsupported snapshot version %d", version) + } + + // We don't support snapshots while there's a config change + // outstanding since the snapshot doesn't have a means to + // represent this state. + committedIndex := r.configurations.committedIndex + latestIndex := r.configurations.latestIndex + if committedIndex != latestIndex { + return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", + latestIndex, committedIndex) + } + + // Cancel any inflight requests. + for { + e := r.leaderState.inflight.Front() + if e == nil { + break + } + e.Value.(*logFuture).respond(ErrAbortedByRestore) + r.leaderState.inflight.Remove(e) + } + + // We will overwrite the snapshot metadata with the current term, + // an index that's greater than the current index, or the last + // index in the snapshot. It's important that we leave a hole in + // the index so we know there's nothing in the Raft log there and + // replication will fault and send the snapshot. + term := r.getCurrentTerm() + lastIndex := r.getLastIndex() + if meta.Index > lastIndex { + lastIndex = meta.Index + } + lastIndex++ + + // Dump the snapshot. Note that we use the latest configuration, + // not the one that came with the snapshot. + sink, err := r.snapshots.Create(version, lastIndex, term, + r.configurations.latest, r.configurations.latestIndex, r.trans) + if err != nil { + return fmt.Errorf("failed to create snapshot: %v", err) + } + n, err := io.Copy(sink, reader) + if err != nil { + sink.Cancel() + return fmt.Errorf("failed to write snapshot: %v", err) + } + if n != meta.Size { + sink.Cancel() + return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size) + } + if err := sink.Close(); err != nil { + return fmt.Errorf("failed to close snapshot: %v", err) + } + r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) + + // Restore the snapshot into the FSM. If this fails we are in a + // bad state so we panic to take ourselves out. + fsm := &restoreFuture{ID: sink.ID()} + fsm.init() + select { + case r.fsmMutateCh <- fsm: + case <-r.shutdownCh: + return ErrRaftShutdown + } + if err := fsm.Error(); err != nil { + panic(fmt.Errorf("failed to restore snapshot: %v", err)) + } + + // We set the last log so it looks like we've stored the empty + // index we burned. The last applied is set because we made the + // FSM take the snapshot state, and we store the last snapshot + // in the stable store since we created a snapshot as part of + // this process. + r.setLastLog(lastIndex, term) + r.setLastApplied(lastIndex) + r.setLastSnapshot(lastIndex, term) + + r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex) + return nil +} + // appendConfigurationEntry changes the configuration and adds a new // configuration entry to the log. This must only be called from the // main thread. @@ -804,7 +912,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) { case LogCommand: // Forward to the fsm handler select { - case r.fsmCommitCh <- commitTuple{l, future}: + case r.fsmMutateCh <- &commitTuple{l, future}: case <-r.shutdownCh: if future != nil { future.respond(ErrRaftShutdown) @@ -1204,7 +1312,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { future := &restoreFuture{ID: sink.ID()} future.init() select { - case r.fsmRestoreCh <- future: + case r.fsmMutateCh <- future: case <-r.shutdownCh: future.respond(ErrRaftShutdown) return diff --git a/vendor/github.com/hashicorp/raft/snapshot.go b/vendor/github.com/hashicorp/raft/snapshot.go index 8402e0938..5287ebc41 100644 --- a/vendor/github.com/hashicorp/raft/snapshot.go +++ b/vendor/github.com/hashicorp/raft/snapshot.go @@ -76,15 +76,19 @@ func (r *Raft) runSnapshots() { } // Trigger a snapshot - if err := r.takeSnapshot(); err != nil { + if _, err := r.takeSnapshot(); err != nil { r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) } - case future := <-r.snapshotCh: + case future := <-r.userSnapshotCh: // User-triggered, run immediately - err := r.takeSnapshot() + id, err := r.takeSnapshot() if err != nil { r.logger.Printf("[ERR] raft: Failed to take snapshot: %v", err) + } else { + future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { + return r.snapshots.Open(id) + } } future.respond(err) @@ -113,8 +117,9 @@ func (r *Raft) shouldSnapshot() bool { } // takeSnapshot is used to take a new snapshot. This must only be called from -// the snapshot thread, never the main thread. -func (r *Raft) takeSnapshot() error { +// the snapshot thread, never the main thread. This returns the ID of the new +// snapshot, along with an error. +func (r *Raft) takeSnapshot() (string, error) { defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now()) // Create a request for the FSM to perform a snapshot. @@ -125,7 +130,7 @@ func (r *Raft) takeSnapshot() error { select { case r.fsmSnapshotCh <- snapReq: case <-r.shutdownCh: - return ErrRaftShutdown + return "", ErrRaftShutdown } // Wait until we get a response @@ -133,7 +138,7 @@ func (r *Raft) takeSnapshot() error { if err != ErrNothingNewToSnapshot { err = fmt.Errorf("failed to start snapshot: %v", err) } - return err + return "", err } defer snapReq.snapshot.Release() @@ -145,10 +150,10 @@ func (r *Raft) takeSnapshot() error { select { case r.configurationsCh <- configReq: case <-r.shutdownCh: - return ErrRaftShutdown + return "", ErrRaftShutdown } if err := configReq.Error(); err != nil { - return err + return "", err } committed := configReq.configurations.committed committedIndex := configReq.configurations.committedIndex @@ -162,7 +167,7 @@ func (r *Raft) takeSnapshot() error { // then it's not crucial that we snapshot, since there's not much going // on Raft-wise. if snapReq.index < committedIndex { - return fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", + return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", committedIndex, snapReq.index) } @@ -172,7 +177,7 @@ func (r *Raft) takeSnapshot() error { version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) if err != nil { - return fmt.Errorf("failed to create snapshot: %v", err) + return "", fmt.Errorf("failed to create snapshot: %v", err) } metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start) @@ -180,13 +185,13 @@ func (r *Raft) takeSnapshot() error { start = time.Now() if err := snapReq.snapshot.Persist(sink); err != nil { sink.Cancel() - return fmt.Errorf("failed to persist snapshot: %v", err) + return "", fmt.Errorf("failed to persist snapshot: %v", err) } metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start) // Close and check for error. if err := sink.Close(); err != nil { - return fmt.Errorf("failed to close snapshot: %v", err) + return "", fmt.Errorf("failed to close snapshot: %v", err) } // Update the last stable snapshot info. @@ -194,11 +199,11 @@ func (r *Raft) takeSnapshot() error { // Compact the logs. if err := r.compactLogs(snapReq.index); err != nil { - return err + return "", err } r.logger.Printf("[INFO] raft: Snapshot to %d complete", snapReq.index) - return nil + return sink.ID(), nil } // compactLogs takes the last inclusive index of a snapshot diff --git a/vendor/vendor.json b/vendor/vendor.json index c0cce02bb..1ed2256a3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -350,10 +350,10 @@ "revisionTime": "2015-11-16T02:03:38Z" }, { - "checksumSHA1": "zMgiTV0dfJIQNRCJDF50bLomDvg=", + "checksumSHA1": "tHzyGCXkf8PnmBrTk1Z01JIv/5Q=", "path": "github.com/hashicorp/raft", - "revision": "c69c15dd73b6695ba75b3502ce6b332cc0042c83", - "revisionTime": "2016-08-01T21:27:18Z" + "revision": "e1d3debe52b9152e8db5c3a77b7f7cf9b2a8b404", + "revisionTime": "2016-10-26T00:17:15Z" }, { "checksumSHA1": "QAxukkv54/iIvLfsUP6IK4R0m/A=", diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index ee10cc1be..80d812f97 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -24,6 +24,7 @@ Each endpoint manages a different aspect of Consul: * [operator](http/operator.html) - Consul Operator Tools * [query](http/query.html) - Prepared Queries * [session](http/session.html) - Sessions +* [snapshots](http/snapshot.html) - Consul Snapshots for Disaster Recovery * [status](http/status.html) - Consul System Status Each of these is documented in detail at the links above. Consul also has a number diff --git a/website/source/docs/agent/http/snapshot.html.markdown b/website/source/docs/agent/http/snapshot.html.markdown new file mode 100644 index 000000000..25fd2d49b --- /dev/null +++ b/website/source/docs/agent/http/snapshot.html.markdown @@ -0,0 +1,97 @@ +--- +layout: "docs" +page_title: "Consul Snapshots (HTTP)" +sidebar_current: "docs-agent-http-snapshots" +description: > + The Snapshot endpoints are used to save and restore Consul's server state for disaster recovery. +--- + +# Snapshot HTTP Endpoint + +The Snapshot endpoints are used to save and restore the state of the Consul +servers for disaster recovery. Snapshots include all state managed by Consul's +Raft [consensus protocol](/docs/internals/consensus.html), including: + +* Key/Value Entries +* Service Catalog +* Prepared Queries +* Sessions +* ACLs + +Available in Consul 0.7.1 and later, these endpoints allow for atomic, +point-in-time snapshots of the above data in a format that can be saved +externally. Snapshots can then be used to restore the server state into a fresh +cluster of Consul servers in the event of a disaster. + +The following endpoints are supported: + +* [`/v1/snapshot`](#snapshot): Save and restore Consul server state + +These endpoints do not support blocking queries. Saving snapshots uses the +consistent mode by default and stale mode is supported. + +The endpoints support the use of ACL Tokens. Because snapshots contain all +server state, including ACLs, a management token is required to perform snapshot +operations is ACLs are enabled. + +-> Snapshot operations are not available for servers running in + [dev mode](/docs/agent/options.html#_dev). + +### /v1/snapshot + +The snapshot endpoint supports the `GET` and `PUT` methods. + +#### GET Method + +When using the `GET` method, Consul will perform an atomic, point-in-time +snapshot of the Consul server state. + +Snapshots are exposed as gzipped tar archives which internally contain the Raft +metadata required to restore, as well as a binary serialized version of the Consul +server state. The contents are covered internally by SHA-256 hashes. These hashes +are verified during snapshot restore operations. The structure of the archive is +internal to Consul and not intended to be used other than for restore operations. +In particular, the archives are not designed to be modified before a restore. + +By default, the datacenter of the agent is queried; however, the `dc` can be +provided using the "?dc=" query parameter. + +By default, snapshots use consistent mode which means the request is internally +forwarded to the cluster leader, and leadership is checked before performing the +snapshot. If `stale` is specified using the "?stale" query parameter, then any +server can handle the request and the results may be arbitrarily stale. To support +bounding the acceptable staleness of snapshots, responses provide the `X-Consul-LastContact` +header containing the time in milliseconds that a server was last contacted by +the leader node. The `X-Consul-KnownLeader` header also indicates if there is a +known leader. These can be used by clients to gauge the staleness of a snapshot +and take appropriate action. The stale mode is particularly useful from taking a +snapshot of a cluster in a failed state with no current leader. + +If ACLs are enabled, the client will need to supply an ACL Token with management +privileges. + +The return code is 200 on success, and the snapshot will be returned in the body +as a gzipped tar archive. In addition to the stale-related headers described above, +the `X-Consul-Index` header will also be set to the index at which the snapshot took +place. + +#### PUT Method + +When using the `PUT` method, Consul will atomically restore a point-in-time +snapshot of the Consul server state. + +Restores involve a potentially dangerous low-level Raft operation that is not +designed to handle server failures during a restore. This operation is primarily +intended to be used when recovering from a disaster, restoring into a fresh +cluster of Consul servers. + +By default, the datacenter of the agent is targeted; however, the `dc` can be +provided using the "?dc=" query parameter. + +If ACLs are enabled, the client will need to supply an ACL Token with management +privileges. + +The body of the request should be a snapshot archive returned from a previous call +to the `GET` method. + +The return code is 200 on success. diff --git a/website/source/docs/commands/kv.html.markdown b/website/source/docs/commands/kv.html.markdown index 24258003e..efc50572c 100644 --- a/website/source/docs/commands/kv.html.markdown +++ b/website/source/docs/commands/kv.html.markdown @@ -14,7 +14,7 @@ and deleting from the store. This command is available in Consul 0.7.1 and later. The key-value store is also accessible via the -[HTTP API](docs/agent/http/kv.html). +[HTTP API](/docs/agent/http/kv.html). ## Usage diff --git a/website/source/docs/commands/snapshot.html.markdown b/website/source/docs/commands/snapshot.html.markdown new file mode 100644 index 000000000..9f9ece331 --- /dev/null +++ b/website/source/docs/commands/snapshot.html.markdown @@ -0,0 +1,59 @@ +--- +layout: "docs" +page_title: "Commands: Snapshot" +sidebar_current: "docs-commands-snapshot" +--- + +# Consul Snapshot + +Command: `consul snapshot` + +The `snapshot` command has subcommands for saving and restoring the state of the +Consul servers for disaster recovery. These are atomic, point-in-time snapshots +which include key/value entries, service catalog, prepared queries, sessions, and +ACLs. This command is available in Consul 0.7.1 and later. + +Snapshots are also accessible via the [HTTP API](/docs/agent/http/snapshot.html). + +## Usage + +Usage: `consul snapshot ` + +For the exact documentation for your Consul version, run `consul snapshot -h` to +view the complete list of subcommands. + +```text +Usage: consul snapshot [options] [args] + + # ... + +Subcommands: + + restore Restores snapshot of Consul server state + save Saves snapshot of Consul server state +``` + +For more information, examples, and usage about a subcommand, click on the name +of the subcommand in the sidebar or one of the links below: + +- [restore](/docs/commands/snapshot/restore.html) +- [save](/docs/commands/snapshot/save.html) + +## Basic Examples + +To create a snapshot and save it as a file called "backup.snap": + +```text +$ consul snapshot save backup.snap +Saved and verified snapshot to index 8419 +``` + +To restore a snapshot from a file called "backup.snap": + +```text +$ consul snapshot restore backup.snap +Restored snapshot +``` + +For more examples, ask for subcommand help or view the subcommand documentation +by clicking on one of the links in the sidebar. diff --git a/website/source/docs/commands/snapshot/restore.html.markdown.erb b/website/source/docs/commands/snapshot/restore.html.markdown.erb new file mode 100644 index 000000000..1f5018f87 --- /dev/null +++ b/website/source/docs/commands/snapshot/restore.html.markdown.erb @@ -0,0 +1,42 @@ +--- +layout: "docs" +page_title: "Commands: Snapshot Restore" +sidebar_current: "docs-commands-snapshot-restore" +--- + +# Consul Snapshot Restore + +Command: `consul snapshot restore` + +The `snapshot restore` command is used to restore an atomic, point-in-time +snapshot of the state of the Consul servers which includes key/value entries, +service catalog, prepared queries, sessions, and ACLs. The snapshot is read +from the given file. + +Restores involve a potentially dangerous low-level Raft operation that is not +designed to handle server failures during a restore. This command is primarily +intended to be used when recovering from a disaster, restoring into a fresh +cluster of Consul servers. + +If ACLs are enabled, a management token must be supplied in order to perform +snapshot a snapshot save. + +## Usage + +Usage: `consul snapshot restore [options] FILE` + +#### API Options + +<%= partial "docs/commands/http_api_options" %> + +## Examples + +To restore a snapshot from the file "backup.snap": + +```text +$ consul snapshot restore backup.snap +Restored snapshot +``` + +Please see the [HTTP API](/docs/agent/http/snapshot.html) documentation for +more details about snapshot internals. diff --git a/website/source/docs/commands/snapshot/save.html.markdown.erb b/website/source/docs/commands/snapshot/save.html.markdown.erb new file mode 100644 index 000000000..bb353eb28 --- /dev/null +++ b/website/source/docs/commands/snapshot/save.html.markdown.erb @@ -0,0 +1,56 @@ +--- +layout: "docs" +page_title: "Commands: Snapshot Save" +sidebar_current: "docs-commands-snapshot-save" +--- + +# Consul Snapshot Save + +Command: `consul snapshot save` + +The `snapshot save` command is used to retrieve an atomic, point-in-time snapshot +of the state of the Consul servers which includes key/value entries, +service catalog, prepared queries, sessions, and ACLs. The snapshot is saved to +the given file. + +If ACLs are enabled, a management token must be supplied in order to perform +snapshot a snapshot save. + +## Usage + +Usage: `consul snapshot save [options] FILE` + +#### API Options + +<%= partial "docs/commands/http_api_options" %> + +## Examples + +To create a snapshot from the leader server and save it to "backup.snap": + +```text +$ consul snapshot save backup.snap +Saved and verified snapshot to index 8419 +``` + +By default, snapshots are taken using a consistent mode that forwards requests +to the leader and the leader verifies it is still in power before taking the +snapshot. + +After the snapshot is written to the given file it is read back and verified for +integrity. + +To create a potentially stale snapshot from any available server, use the stale +consisentcy mode: + +```text +$ consul snapshot save -stale backup.snap +# ... +``` + +This is useful for situations where a cluster is in a degraded state and no +leader is available. To target a specific server for a snapshot, you can run +the `consul snapshot save` command on that specific server. + +Please see the [HTTP API](/docs/agent/http/snapshot.html) documentation for +more details about snapshot internals. diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 7c5689108..0b7a00ec5 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -149,6 +149,18 @@ rtt + > + snapshot + + + > watch @@ -198,7 +210,7 @@ > - Operator + Operator > @@ -209,6 +221,10 @@ Sessions + > + Snapshots + + > Status