Call fsync() for saving check/service state

This commit is contained in:
Kyle Havlovitz 2016-11-07 13:51:03 -05:00
parent 40d1c279dd
commit 6b6601093c
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
1 changed files with 32 additions and 25 deletions

View File

@ -664,6 +664,7 @@ func (a *Agent) sendCoordinate() {
// persistService saves a service definition to a JSON file in the data dir // persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error { func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
wrapped := persistedService{ wrapped := persistedService{
Token: a.state.ServiceToken(service.ID), Token: a.state.ServiceToken(service.ID),
Service: service, Service: service,
@ -672,17 +673,8 @@ func (a *Agent) persistService(service *structs.NodeService) error {
if err != nil { if err != nil {
return err return err
} }
if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil {
return err return writeFileAtomic(svcPath, encoded)
}
tempSvcPath := svcPath + ".tmp"
if err := ioutil.WriteFile(tempSvcPath, encoded, 0600); err != nil {
return err
}
if err := os.Rename(tempSvcPath, svcPath); err != nil {
return err
}
return nil
} }
// purgeService removes a persisted service definition file from the data dir // purgeService removes a persisted service definition file from the data dir
@ -709,18 +701,8 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) err
if err != nil { if err != nil {
return err return err
} }
if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil {
return err return writeFileAtomic(checkPath, encoded)
}
fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
return nil
} }
// purgeCheck removes a persisted check definition file from the data dir // purgeCheck removes a persisted check definition file from the data dir
@ -732,6 +714,29 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
return nil return nil
} }
// writeFileAtomic writes the given contents to a temporary file in the same
// directory, does an fsync and then renames the file to its real path
func writeFileAtomic(path string, contents []byte) error {
tempPath := path + ".tmp"
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return err
}
fh, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
if _, err := fh.Write(contents); err != nil {
return err
}
if err := fh.Sync(); err != nil {
return err
}
if err := fh.Close(); err != nil {
return err
}
return os.Rename(tempPath, path)
}
// AddService is used to add a service entry. // AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
@ -1091,7 +1096,8 @@ func (a *Agent) persistCheckState(check *CheckTTL, status, output string) error
// Write the state to the file // Write the state to the file
file := filepath.Join(dir, checkIDHash(check.CheckID)) file := filepath.Join(dir, checkIDHash(check.CheckID))
// Create temp file in same folder, to make more likely atomic
// Create temp file in same dir, to make more likely atomic
tempFile := file + ".tmp" tempFile := file + ".tmp"
if err := ioutil.WriteFile(tempFile, buf, 0600); err != nil { if err := ioutil.WriteFile(tempFile, buf, 0600); err != nil {
@ -1119,7 +1125,8 @@ func (a *Agent) loadCheckState(check *structs.HealthCheck) error {
// Decode the state data // Decode the state data
var p persistedCheckState var p persistedCheckState
if err := json.Unmarshal(buf, &p); err != nil { if err := json.Unmarshal(buf, &p); err != nil {
return fmt.Errorf("failed decoding check state: %s", err) a.logger.Printf("[ERROR] agent: failed decoding check state: %s", err)
return a.purgeCheckState(check.CheckID)
} }
// Check if the state has expired // Check if the state has expired