From 6b6601093c53abcd72141623de9ac564d79f3fb5 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 7 Nov 2016 13:51:03 -0500 Subject: [PATCH] Call fsync() for saving check/service state --- command/agent/agent.go | 57 ++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 0afdabfec..bb294b446 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -664,6 +664,7 @@ func (a *Agent) sendCoordinate() { // persistService saves a service definition to a JSON file in the data dir func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) + wrapped := persistedService{ Token: a.state.ServiceToken(service.ID), Service: service, @@ -672,17 +673,8 @@ func (a *Agent) persistService(service *structs.NodeService) error { if err != nil { return err } - if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil { - return err - } - tempSvcPath := svcPath + ".tmp" - if err := ioutil.WriteFile(tempSvcPath, encoded, 0600); err != nil { - return err - } - if err := os.Rename(tempSvcPath, svcPath); err != nil { - return err - } - return nil + + return writeFileAtomic(svcPath, encoded) } // purgeService removes a persisted service definition file from the data dir @@ -709,18 +701,8 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) err if err != nil { return err } - if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil { - return err - } - fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) - if err != nil { - return err - } - defer fh.Close() - if _, err := fh.Write(encoded); err != nil { - return err - } - return nil + + return writeFileAtomic(checkPath, encoded) } // purgeCheck removes a persisted check definition file from the data dir @@ -732,6 +714,29 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { return nil } +// writeFileAtomic writes the given contents to a temporary file in the same +// directory, does an fsync and then renames the file to its real path +func writeFileAtomic(path string, contents []byte) error { + tempPath := path + ".tmp" + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + return err + } + fh, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + if err != nil { + return err + } + if _, err := fh.Write(contents); err != nil { + return err + } + if err := fh.Sync(); err != nil { + return err + } + if err := fh.Close(); err != nil { + return err + } + return os.Rename(tempPath, path) +} + // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered @@ -1091,7 +1096,8 @@ func (a *Agent) persistCheckState(check *CheckTTL, status, output string) error // Write the state to the file file := filepath.Join(dir, checkIDHash(check.CheckID)) - // Create temp file in same folder, to make more likely atomic + + // Create temp file in same dir, to make more likely atomic tempFile := file + ".tmp" if err := ioutil.WriteFile(tempFile, buf, 0600); err != nil { @@ -1119,7 +1125,8 @@ func (a *Agent) loadCheckState(check *structs.HealthCheck) error { // Decode the state data var p persistedCheckState if err := json.Unmarshal(buf, &p); err != nil { - return fmt.Errorf("failed decoding check state: %s", err) + a.logger.Printf("[ERROR] agent: failed decoding check state: %s", err) + return a.purgeCheckState(check.CheckID) } // Check if the state has expired