package api import ( "archive/tar" "compress/gzip" "context" "encoding/json" "errors" "io" "io/ioutil" "net/http" "sync" "time" "github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/mitchellh/mapstructure" ) var ErrIncompleteSnapshot = errors.New("incomplete snapshot, unable to read SHA256SUMS.sealed file") // RaftJoinResponse represents the response of the raft join API type RaftJoinResponse struct { Joined bool `json:"joined"` } // RaftJoinRequest represents the parameters consumed by the raft join API type RaftJoinRequest struct { AutoJoin string `json:"auto_join"` AutoJoinScheme string `json:"auto_join_scheme"` AutoJoinPort uint `json:"auto_join_port"` LeaderAPIAddr string `json:"leader_api_addr"` LeaderCACert string `json:"leader_ca_cert"` LeaderClientCert string `json:"leader_client_cert"` LeaderClientKey string `json:"leader_client_key"` Retry bool `json:"retry"` NonVoter bool `json:"non_voter"` } // AutopilotConfig is used for querying/setting the Autopilot configuration. type AutopilotConfig struct { CleanupDeadServers bool `json:"cleanup_dead_servers" mapstructure:"cleanup_dead_servers"` LastContactThreshold time.Duration `json:"last_contact_threshold" mapstructure:"-"` DeadServerLastContactThreshold time.Duration `json:"dead_server_last_contact_threshold" mapstructure:"-"` MaxTrailingLogs uint64 `json:"max_trailing_logs" mapstructure:"max_trailing_logs"` MinQuorum uint `json:"min_quorum" mapstructure:"min_quorum"` ServerStabilizationTime time.Duration `json:"server_stabilization_time" mapstructure:"-"` DisableUpgradeMigration bool `json:"disable_upgrade_migration" mapstructure:"disable_upgrade_migration"` } // MarshalJSON makes the autopilot config fields JSON compatible func (ac *AutopilotConfig) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ "cleanup_dead_servers": ac.CleanupDeadServers, "last_contact_threshold": ac.LastContactThreshold.String(), "dead_server_last_contact_threshold": ac.DeadServerLastContactThreshold.String(), "max_trailing_logs": ac.MaxTrailingLogs, "min_quorum": ac.MinQuorum, "server_stabilization_time": ac.ServerStabilizationTime.String(), "disable_upgrade_migration": ac.DisableUpgradeMigration, }) } // UnmarshalJSON parses the autopilot config JSON blob func (ac *AutopilotConfig) UnmarshalJSON(b []byte) error { var data interface{} err := json.Unmarshal(b, &data) if err != nil { return err } conf := data.(map[string]interface{}) if err = mapstructure.WeakDecode(conf, ac); err != nil { return err } if ac.LastContactThreshold, err = parseutil.ParseDurationSecond(conf["last_contact_threshold"]); err != nil { return err } if ac.DeadServerLastContactThreshold, err = parseutil.ParseDurationSecond(conf["dead_server_last_contact_threshold"]); err != nil { return err } if ac.ServerStabilizationTime, err = parseutil.ParseDurationSecond(conf["server_stabilization_time"]); err != nil { return err } return nil } // AutopilotState represents the response of the raft autopilot state API type AutopilotState struct { Healthy bool `mapstructure:"healthy"` FailureTolerance int `mapstructure:"failure_tolerance"` Servers map[string]*AutopilotServer `mapstructure:"servers"` Leader string `mapstructure:"leader"` Voters []string `mapstructure:"voters"` NonVoters []string `mapstructure:"non_voters"` RedundancyZones map[string]AutopilotZone `mapstructure:"redundancy_zones,omitempty"` Upgrade *AutopilotUpgrade `mapstructure:"upgrade_info,omitempty"` OptimisticFailureTolerance int `mapstructure:"optimistic_failure_tolerance,omitempty"` } // AutopilotServer represents the server blocks in the response of the raft // autopilot state API. type AutopilotServer struct { ID string `mapstructure:"id"` Name string `mapstructure:"name"` Address string `mapstructure:"address"` NodeStatus string `mapstructure:"node_status"` LastContact string `mapstructure:"last_contact"` LastTerm uint64 `mapstructure:"last_term"` LastIndex uint64 `mapstructure:"last_index"` Healthy bool `mapstructure:"healthy"` StableSince string `mapstructure:"stable_since"` Status string `mapstructure:"status"` Version string `mapstructure:"version"` UpgradeVersion string `mapstructure:"upgrade_version,omitempty"` RedundancyZone string `mapstructure:"redundancy_zone,omitempty"` NodeType string `mapstructure:"node_type,omitempty"` } type AutopilotZone struct { Servers []string `mapstructure:"servers,omitempty"` Voters []string `mapstructure:"voters,omitempty"` FailureTolerance int `mapstructure:"failure_tolerance,omitempty"` } type AutopilotUpgrade struct { Status string `mapstructure:"status"` TargetVersion string `mapstructure:"target_version,omitempty"` TargetVersionVoters []string `mapstructure:"target_version_voters,omitempty"` TargetVersionNonVoters []string `mapstructure:"target_version_non_voters,omitempty"` TargetVersionReadReplicas []string `mapstructure:"target_version_read_replicas,omitempty"` OtherVersionVoters []string `mapstructure:"other_version_voters,omitempty"` OtherVersionNonVoters []string `mapstructure:"other_version_non_voters,omitempty"` OtherVersionReadReplicas []string `mapstructure:"other_version_read_replicas,omitempty"` RedundancyZones map[string]AutopilotZoneUpgradeVersions `mapstructure:"redundancy_zones,omitempty"` } type AutopilotZoneUpgradeVersions struct { TargetVersionVoters []string `mapstructure:"target_version_voters,omitempty"` TargetVersionNonVoters []string `mapstructure:"target_version_non_voters,omitempty"` OtherVersionVoters []string `mapstructure:"other_version_voters,omitempty"` OtherVersionNonVoters []string `mapstructure:"other_version_non_voters,omitempty"` } // RaftJoin wraps RaftJoinWithContext using context.Background. func (c *Sys) RaftJoin(opts *RaftJoinRequest) (*RaftJoinResponse, error) { return c.RaftJoinWithContext(context.Background(), opts) } // RaftJoinWithContext adds the node from which this call is invoked from to the raft // cluster represented by the leader address in the parameter. func (c *Sys) RaftJoinWithContext(ctx context.Context, opts *RaftJoinRequest) (*RaftJoinResponse, error) { ctx, cancelFunc := c.c.withConfiguredTimeout(ctx) defer cancelFunc() r := c.c.NewRequest(http.MethodPost, "/v1/sys/storage/raft/join") if err := r.SetJSONBody(opts); err != nil { return nil, err } resp, err := c.c.rawRequestWithContext(ctx, r) if err != nil { return nil, err } defer resp.Body.Close() var result RaftJoinResponse err = resp.DecodeJSON(&result) return &result, err } // RaftSnapshot wraps RaftSnapshotWithContext using context.Background. func (c *Sys) RaftSnapshot(snapWriter io.Writer) error { return c.RaftSnapshotWithContext(context.Background(), snapWriter) } // RaftSnapshotWithContext invokes the API that takes the snapshot of the raft cluster and // writes it to the supplied io.Writer. func (c *Sys) RaftSnapshotWithContext(ctx context.Context, snapWriter io.Writer) error { r := c.c.NewRequest(http.MethodGet, "/v1/sys/storage/raft/snapshot") r.URL.RawQuery = r.Params.Encode() resp, err := c.c.httpRequestWithContext(ctx, r) if err != nil { return err } defer resp.Body.Close() // Make sure that the last file in the archive, SHA256SUMS.sealed, is present // and non-empty. This is to catch cases where the snapshot failed midstream, // e.g. due to a problem with the seal that prevented encryption of that file. var wg sync.WaitGroup wg.Add(1) var verified bool rPipe, wPipe := io.Pipe() dup := io.TeeReader(resp.Body, wPipe) go func() { defer func() { io.Copy(ioutil.Discard, rPipe) rPipe.Close() wg.Done() }() uncompressed, err := gzip.NewReader(rPipe) if err != nil { return } t := tar.NewReader(uncompressed) var h *tar.Header for { h, err = t.Next() if err != nil { return } if h.Name != "SHA256SUMS.sealed" { continue } var b []byte b, err = ioutil.ReadAll(t) if err != nil || len(b) == 0 { return } verified = true return } }() // Copy bytes from dup to snapWriter. This will have a side effect that // everything read from dup will be written to wPipe. _, err = io.Copy(snapWriter, dup) wPipe.Close() if err != nil { rPipe.CloseWithError(err) return err } wg.Wait() if !verified { return ErrIncompleteSnapshot } return nil } // RaftSnapshotRestore wraps RaftSnapshotRestoreWithContext using context.Background. func (c *Sys) RaftSnapshotRestore(snapReader io.Reader, force bool) error { return c.RaftSnapshotRestoreWithContext(context.Background(), snapReader, force) } // RaftSnapshotRestoreWithContext reads the snapshot from the io.Reader and installs that // snapshot, returning the cluster to the state defined by it. func (c *Sys) RaftSnapshotRestoreWithContext(ctx context.Context, snapReader io.Reader, force bool) error { path := "/v1/sys/storage/raft/snapshot" if force { path = "/v1/sys/storage/raft/snapshot-force" } r := c.c.NewRequest(http.MethodPost, path) r.Body = snapReader resp, err := c.c.httpRequestWithContext(ctx, r) if err != nil { return err } defer resp.Body.Close() return nil } // RaftAutopilotState wraps RaftAutopilotStateWithContext using context.Background. func (c *Sys) RaftAutopilotState() (*AutopilotState, error) { return c.RaftAutopilotStateWithContext(context.Background()) } // RaftAutopilotStateWithContext returns the state of the raft cluster as seen by autopilot. func (c *Sys) RaftAutopilotStateWithContext(ctx context.Context) (*AutopilotState, error) { ctx, cancelFunc := c.c.withConfiguredTimeout(ctx) defer cancelFunc() r := c.c.NewRequest(http.MethodGet, "/v1/sys/storage/raft/autopilot/state") resp, err := c.c.rawRequestWithContext(ctx, r) if resp != nil { defer resp.Body.Close() if resp.StatusCode == 404 { return nil, nil } } if err != nil { return nil, err } secret, err := ParseSecret(resp.Body) if err != nil { return nil, err } if secret == nil || secret.Data == nil { return nil, errors.New("data from server response is empty") } var result AutopilotState err = mapstructure.Decode(secret.Data, &result) if err != nil { return nil, err } return &result, err } // RaftAutopilotConfiguration wraps RaftAutopilotConfigurationWithContext using context.Background. func (c *Sys) RaftAutopilotConfiguration() (*AutopilotConfig, error) { return c.RaftAutopilotConfigurationWithContext(context.Background()) } // RaftAutopilotConfigurationWithContext fetches the autopilot config. func (c *Sys) RaftAutopilotConfigurationWithContext(ctx context.Context) (*AutopilotConfig, error) { ctx, cancelFunc := c.c.withConfiguredTimeout(ctx) defer cancelFunc() r := c.c.NewRequest(http.MethodGet, "/v1/sys/storage/raft/autopilot/configuration") resp, err := c.c.rawRequestWithContext(ctx, r) if resp != nil { defer resp.Body.Close() if resp.StatusCode == 404 { return nil, nil } } if err != nil { return nil, err } secret, err := ParseSecret(resp.Body) if err != nil { return nil, err } if secret == nil { return nil, errors.New("data from server response is empty") } var result AutopilotConfig if err = mapstructure.Decode(secret.Data, &result); err != nil { return nil, err } if result.LastContactThreshold, err = parseutil.ParseDurationSecond(secret.Data["last_contact_threshold"]); err != nil { return nil, err } if result.DeadServerLastContactThreshold, err = parseutil.ParseDurationSecond(secret.Data["dead_server_last_contact_threshold"]); err != nil { return nil, err } if result.ServerStabilizationTime, err = parseutil.ParseDurationSecond(secret.Data["server_stabilization_time"]); err != nil { return nil, err } return &result, err } // PutRaftAutopilotConfiguration wraps PutRaftAutopilotConfigurationWithContext using context.Background. func (c *Sys) PutRaftAutopilotConfiguration(opts *AutopilotConfig) error { return c.PutRaftAutopilotConfigurationWithContext(context.Background(), opts) } // PutRaftAutopilotConfigurationWithContext allows modifying the raft autopilot configuration func (c *Sys) PutRaftAutopilotConfigurationWithContext(ctx context.Context, opts *AutopilotConfig) error { ctx, cancelFunc := c.c.withConfiguredTimeout(ctx) defer cancelFunc() r := c.c.NewRequest(http.MethodPost, "/v1/sys/storage/raft/autopilot/configuration") if err := r.SetJSONBody(opts); err != nil { return err } resp, err := c.c.rawRequestWithContext(ctx, r) if err != nil { return err } defer resp.Body.Close() return nil }