Fix TSAN failures in DistributedMutex tests (#5684)

Summary:
TSAN was not able to correctly instrument atomic bts and btr instructions, so
when TSAN is enabled implement those with std::atomic::fetch_or and
std::atomic::fetch_and. Also disable tests that fail on TSAN with false
negatives (we know these are false negatives because this other verifiably
correct program fails with the same TSAN error <link>)

```
make clean
TEST_TMPDIR=/dev/shm/rocksdb OPT=-g COMPILE_WITH_TSAN=1 make J=1 -j56 folly_synchronization_distributed_mutex_test
```

This is the code that fails with the same false-negative with TSAN
```
namespace {
class ExceptionWithConstructionTrack : public std::exception {
 public:
  explicit ExceptionWithConstructionTrack(int id)
      : id_{folly::to<std::string>(id)}, constructionTrack_{id} {}

  const char* what() const noexcept override {
    return id_.c_str();
  }

 private:
  std::string id_;
  TestConstruction constructionTrack_;
};

template <typename Storage, typename Atomic>
void transferCurrentException(Storage& storage, Atomic& produced) {
  assert(std::current_exception());
  new (&storage) std::exception_ptr(std::current_exception());
  produced->store(true, std::memory_order_release);
}

void concurrentExceptionPropagationStress(
    int numThreads,
    std::chrono::milliseconds milliseconds) {
  auto&& stop = std::atomic<bool>{false};
  auto&& exceptions = std::vector<std::aligned_storage<48, 8>::type>{};
  auto&& produced = std::vector<std::unique_ptr<std::atomic<bool>>>{};
  auto&& consumed = std::vector<std::unique_ptr<std::atomic<bool>>>{};
  auto&& consumers = std::vector<std::thread>{};
  for (auto i = 0; i < numThreads; ++i) {
    produced.emplace_back(new std::atomic<bool>{false});
    consumed.emplace_back(new std::atomic<bool>{false});
    exceptions.push_back({});
  }

  auto producer = std::thread{[&]() {
    auto counter = std::vector<int>(numThreads, 0);
    for (auto i = 0; true; i = ((i + 1) % numThreads)) {
      try {
        throw ExceptionWithConstructionTrack{counter.at(i)++};
      } catch (...) {
        transferCurrentException(exceptions.at(i), produced.at(i));
      }

      while (!consumed.at(i)->load(std::memory_order_acquire)) {
        if (stop.load(std::memory_order_acquire)) {
          return;
        }
      }

      consumed.at(i)->store(false, std::memory_order_release);
    }
  }};

  for (auto i = 0; i < numThreads; ++i) {
    consumers.emplace_back([&, i]() {
      auto counter = 0;
      while (true) {
        while (!produced.at(i)->load(std::memory_order_acquire)) {
          if (stop.load(std::memory_order_acquire)) {
            return;
          }
        }
        produced.at(i)->store(false, std::memory_order_release);

        try {
          auto storage = &exceptions.at(i);
          auto exc = folly::launder(
            reinterpret_cast<std::exception_ptr*>(storage));
          auto copy = std::move(*exc);
          exc->std::exception_ptr::~exception_ptr();
          std::rethrow_exception(std::move(copy));
        } catch (std::exception& exc) {
          auto value = std::stoi(exc.what());
          EXPECT_EQ(value, counter++);
        }

        consumed.at(i)->store(true, std::memory_order_release);
      }
    });
  }

  std::this_thread::sleep_for(milliseconds);
  stop.store(true);
  producer.join();
  for (auto& thread : consumers) {
    thread.join();
  }
}
} // namespace
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5684

Differential Revision: D16746077

Pulled By: miasantreble

fbshipit-source-id: 8af88dcf9161c05daec1a76290f577918638f79d
This commit is contained in:
Aaryaman Sagar 2019-08-14 16:58:11 -07:00 committed by Facebook Github Bot
parent 7785f61132
commit 77273d4137
5 changed files with 37 additions and 7 deletions

View File

@ -256,8 +256,8 @@ endif
ifdef COMPILE_WITH_TSAN ifdef COMPILE_WITH_TSAN
DISABLE_JEMALLOC=1 DISABLE_JEMALLOC=1
EXEC_LDFLAGS += -fsanitize=thread EXEC_LDFLAGS += -fsanitize=thread
PLATFORM_CCFLAGS += -fsanitize=thread -fPIC PLATFORM_CCFLAGS += -fsanitize=thread -fPIC -DFOLLY_SANITIZE_THREAD
PLATFORM_CXXFLAGS += -fsanitize=thread -fPIC PLATFORM_CXXFLAGS += -fsanitize=thread -fPIC -DFOLLY_SANITIZE_THREAD
# Turn off -pg when enabling TSAN testing, because that induces # Turn off -pg when enabling TSAN testing, because that induces
# a link failure. TODO: find the root cause # a link failure. TODO: find the root cause
PROFILING_FLAGS = PROFILING_FLAGS =
@ -1285,7 +1285,7 @@ db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJEC
db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

View File

@ -13,3 +13,15 @@
#else #else
#define FOLLY_EXPORT #define FOLLY_EXPORT
#endif #endif
#if defined(__has_feature)
#define FOLLY_HAS_FEATURE(...) __has_feature(__VA_ARGS__)
#else
#define FOLLY_HAS_FEATURE(...) 0
#endif
#if FOLLY_HAS_FEATURE(thread_sanitizer) || __SANITIZE_THREAD__
#ifndef FOLLY_SANITIZE_THREAD
#define FOLLY_SANITIZE_THREAD 1
#endif
#endif

View File

@ -5,6 +5,8 @@
#pragma once #pragma once
#include <folly/CPortability.h>
#if defined(__arm__) #if defined(__arm__)
#define FOLLY_ARM 1 #define FOLLY_ARM 1
#else #else
@ -72,3 +74,11 @@ constexpr bool kIsMsvc = true;
constexpr bool kIsMsvc = false; constexpr bool kIsMsvc = false;
#endif #endif
} // namespace folly } // namespace folly
namespace folly {
#if FOLLY_SANITIZE_THREAD
constexpr bool kIsSanitizeThread = true;
#else
constexpr bool kIsSanitizeThread = false;
#endif
} // namespace folly

View File

@ -230,8 +230,9 @@ bool atomic_fetch_set(Atomic& atomic, std::size_t bit, std::memory_order mo) {
static_assert(!std::is_const<Atomic>{}, ""); static_assert(!std::is_const<Atomic>{}, "");
assert(bit < (sizeof(Integer) * 8)); assert(bit < (sizeof(Integer) * 8));
if (folly::kIsArchAmd64) { // do the optimized thing on x86 builds. Also, some versions of TSAN do not
// do the optimized thing on x86 builds // properly instrument the inline assembly, so avoid it when TSAN is enabled
if (folly::kIsArchAmd64 && !folly::kIsSanitizeThread) {
return detail::atomic_fetch_set_x86(atomic, bit, mo); return detail::atomic_fetch_set_x86(atomic, bit, mo);
} else { } else {
// otherwise default to the default implementation using fetch_or() // otherwise default to the default implementation using fetch_or()
@ -246,8 +247,9 @@ bool atomic_fetch_reset(Atomic& atomic, std::size_t bit, std::memory_order mo) {
static_assert(!std::is_const<Atomic>{}, ""); static_assert(!std::is_const<Atomic>{}, "");
assert(bit < (sizeof(Integer) * 8)); assert(bit < (sizeof(Integer) * 8));
if (folly::kIsArchAmd64) { // do the optimized thing on x86 builds. Also, some versions of TSAN do not
// do the optimized thing on x86 builds // properly instrument the inline assembly, so avoid it when TSAN is enabled
if (folly::kIsArchAmd64 && !folly::kIsSanitizeThread) {
return detail::atomic_fetch_reset_x86(atomic, bit, mo); return detail::atomic_fetch_reset_x86(atomic, bit, mo);
} else { } else {
// otherwise default to the default implementation using fetch_and() // otherwise default to the default implementation using fetch_and()

View File

@ -981,6 +981,12 @@ template <template <typename> class Atom = std::atomic>
void concurrentExceptionPropagationStress( void concurrentExceptionPropagationStress(
int numThreads, int numThreads,
std::chrono::milliseconds t) { std::chrono::milliseconds t) {
// this test fails under with a false negative under older versions of TSAN
// for some reason so disable it when TSAN is enabled
if (folly::kIsSanitizeThread) {
return;
}
TestConstruction::reset(); TestConstruction::reset();
auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
auto&& threads = std::vector<std::thread>{}; auto&& threads = std::vector<std::thread>{};