Add structures for exposing thread events and operations.

Summary:
Add structures for exposing events and operations.  Event describes
high-level action about a thread such as doing compaciton or
doing flush, while an operation describes lower-level action
of a thread such as reading / writing a SST table, waiting for
mutex.  Events and operations are designed to be independent.
One thread would typically involve in one event and one operation.

Code instrument will be in a separate diff.

Test Plan:
Add unit-tests in thread_list_test
make dbg -j32
./thread_list_test
export ROCKSDB_TESTS=ThreadList
./db_test

Reviewers: ljin, igor, sdong

Reviewed By: sdong

Subscribers: rven, jonahcohen, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D29781
This commit is contained in:
Yueh-Hsuan Chiang 2014-12-30 10:39:13 -08:00
parent a801c1fb09
commit bf287b76e0
7 changed files with 419 additions and 101 deletions

View File

@ -9435,10 +9435,10 @@ TEST(DBTest, GetThreadList) {
env_->SleepForMicroseconds(100000); env_->SleepForMicroseconds(100000);
s = env_->GetThreadList(&thread_list); s = env_->GetThreadList(&thread_list);
ASSERT_OK(s); ASSERT_OK(s);
unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL]; unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
memset(thread_type_counts, 0, sizeof(thread_type_counts)); memset(thread_type_counts, 0, sizeof(thread_type_counts));
for (auto thread : thread_list) { for (auto thread : thread_list) {
ASSERT_LT(thread.thread_type, ThreadStatus::ThreadType::TOTAL); ASSERT_LT(thread.thread_type, ThreadStatus::NUM_THREAD_TYPES);
thread_type_counts[thread.thread_type]++; thread_type_counts[thread.thread_type]++;
} }
// Verify the total number of threades // Verify the total number of threades
@ -9447,11 +9447,11 @@ TEST(DBTest, GetThreadList) {
kHighPriCounts[test] + kLowPriCounts[test]); kHighPriCounts[test] + kLowPriCounts[test]);
// Verify the number of high-priority threads // Verify the number of high-priority threads
ASSERT_EQ( ASSERT_EQ(
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], thread_type_counts[ThreadStatus::HIGH_PRIORITY],
kHighPriCounts[test]); kHighPriCounts[test]);
// Verify the number of low-priority threads // Verify the number of low-priority threads
ASSERT_EQ( ASSERT_EQ(
thread_type_counts[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], thread_type_counts[ThreadStatus::LOW_PRIORITY],
kLowPriCounts[test]); kLowPriCounts[test]);
} }
if (i == 0) { if (i == 0) {

View File

@ -2,6 +2,14 @@
// This source code is licensed under the BSD-style license found in the // This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
//
// This file defines the structures for exposing run-time status of any
// rocksdb-related thread. Such run-time status can be obtained via
// GetThreadList() API.
//
// Note that all thread-status features are still under-development, and
// thus APIs and class definitions might subject to change at this point.
// Will remove this comment once the APIs have been finalized.
#pragma once #pragma once
@ -22,29 +30,48 @@ namespace rocksdb {
// The status of active threads can be fetched using // The status of active threads can be fetched using
// rocksdb::GetThreadList(). // rocksdb::GetThreadList().
struct ThreadStatus { struct ThreadStatus {
enum ThreadType { // The type of a thread.
ROCKSDB_HIGH_PRIORITY = 0x0, enum ThreadType : int {
ROCKSDB_LOW_PRIORITY = 0x1, HIGH_PRIORITY = 0, // RocksDB BG thread in high-pri thread pool
USER_THREAD = 0x2, LOW_PRIORITY, // RocksDB BG thread in low-pri thread pool
TOTAL = 0x3 USER, // User thread (Non-RocksDB BG thread)
NUM_THREAD_TYPES
};
// The type used to refer to a thread operation.
// A thread operation describes high-level action of a thread.
// Examples include compaction and flush.
enum OperationType : int {
OP_UNKNOWN = 0,
OP_COMPACTION,
OP_FLUSH,
NUM_OP_TYPES
};
// The type used to refer to a thread state.
// A state describes lower-level action of a thread
// such as reading / writing a file or waiting for a mutex.
enum StateType : int {
STATE_UNKNOWN = 0,
NUM_STATE_TYPES
}; };
#if ROCKSDB_USING_THREAD_STATUS
ThreadStatus(const uint64_t _id, ThreadStatus(const uint64_t _id,
const ThreadType _thread_type, const ThreadType _thread_type,
const std::string& _db_name, const std::string& _db_name,
const std::string& _cf_name, const std::string& _cf_name,
const std::string& _event) : const OperationType _operation_type,
const StateType _state_type) :
thread_id(_id), thread_type(_thread_type), thread_id(_id), thread_type(_thread_type),
db_name(_db_name), db_name(_db_name),
cf_name(_cf_name), cf_name(_cf_name),
event(_event) {} operation_type(_operation_type), state_type(_state_type) {}
// An unique ID for the thread. // An unique ID for the thread.
const uint64_t thread_id; const uint64_t thread_id;
// The type of the thread, it could be ROCKSDB_HIGH_PRIORITY, // The type of the thread, it could be HIGH_PRIORITY,
// ROCKSDB_LOW_PRIORITY, and USER_THREAD // LOW_PRIORITY, and USER
const ThreadType thread_type; const ThreadType thread_type;
// The name of the DB instance where the thread is currently // The name of the DB instance where the thread is currently
@ -57,11 +84,11 @@ struct ThreadStatus {
// in any column family. // in any column family.
const std::string cf_name; const std::string cf_name;
// The event that the current thread is involved. // The operation (high-level action) that the current thread is involved.
// It would be set to empty string if the information about event const OperationType operation_type;
// is not currently available.
const std::string event; // The state (lower-level action) that the current thread is involved.
#endif // ROCKSDB_USING_THREAD_STATUS const StateType state_type;
}; };
} // namespace rocksdb } // namespace rocksdb

View File

@ -1694,8 +1694,8 @@ class PosixEnv : public Env {
// for thread-status // for thread-status
ThreadStatusUtil::SetThreadType(tp->env_, ThreadStatusUtil::SetThreadType(tp->env_,
(tp->GetThreadPriority() == Env::Priority::HIGH ? (tp->GetThreadPriority() == Env::Priority::HIGH ?
ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY : ThreadStatus::HIGH_PRIORITY :
ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY)); ThreadStatus::LOW_PRIORITY));
#endif #endif
delete meta; delete meta;
tp->BGThread(thread_id); tp->BGThread(thread_id);

View File

@ -14,46 +14,65 @@
namespace rocksdb { namespace rocksdb {
class SleepingBackgroundTask { class SimulatedBackgroundTask {
public: public:
SleepingBackgroundTask(const void* db_key, const std::string& db_name, SimulatedBackgroundTask(
const void* cf_key, const std::string& cf_name) const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name,
const ThreadStatus::OperationType operation_type =
ThreadStatus::OP_UNKNOWN,
const ThreadStatus::StateType state_type =
ThreadStatus::STATE_UNKNOWN)
: db_key_(db_key), db_name_(db_name), : db_key_(db_key), db_name_(db_name),
cf_key_(cf_key), cf_name_(cf_name), cf_key_(cf_key), cf_name_(cf_name),
should_sleep_(true), sleeping_count_(0) { operation_type_(operation_type), state_type_(state_type),
should_run_(true), running_count_(0) {
Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
db_key_, db_name_, cf_key_, cf_name_); db_key_, db_name_, cf_key_, cf_name_);
} }
~SleepingBackgroundTask() { ~SimulatedBackgroundTask() {
Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
} }
void DoSleep() { void Run() {
Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
sleeping_count_++; running_count_++;
while (should_sleep_) { Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
operation_type_);
Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_);
while (should_run_) {
bg_cv_.wait(l); bg_cv_.wait(l);
} }
sleeping_count_--; Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
bg_cv_.notify_all(); Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0); Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0);
} running_count_--;
void WakeUp() {
std::unique_lock<std::mutex> l(mutex_);
should_sleep_ = false;
bg_cv_.notify_all(); bg_cv_.notify_all();
} }
void FinishAllTasks() {
std::unique_lock<std::mutex> l(mutex_);
should_run_ = false;
bg_cv_.notify_all();
}
void WaitUntilScheduled(int job_count, Env* env) {
while (running_count_ < job_count) {
env->SleepForMicroseconds(1000);
}
}
void WaitUntilDone() { void WaitUntilDone() {
std::unique_lock<std::mutex> l(mutex_); std::unique_lock<std::mutex> l(mutex_);
while (sleeping_count_ > 0) { while (running_count_ > 0) {
bg_cv_.wait(l); bg_cv_.wait(l);
} }
} }
static void DoSleepTask(void* arg) { static void DoSimulatedTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep(); reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run();
} }
private: private:
@ -61,10 +80,12 @@ class SleepingBackgroundTask {
const std::string db_name_; const std::string db_name_;
const void* cf_key_; const void* cf_key_;
const std::string cf_name_; const std::string cf_name_;
const ThreadStatus::OperationType operation_type_;
const ThreadStatus::StateType state_type_;
std::mutex mutex_; std::mutex mutex_;
std::condition_variable bg_cv_; std::condition_variable bg_cv_;
bool should_sleep_; bool should_run_;
std::atomic<int> sleeping_count_; std::atomic<int> running_count_;
}; };
class ThreadListTest { class ThreadListTest {
@ -73,72 +94,232 @@ class ThreadListTest {
} }
}; };
TEST(ThreadListTest, EventTables) {
// verify the global tables for operations and states are properly indexed.
for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
ASSERT_EQ(global_operation_table[type].type, type);
}
for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) {
ASSERT_EQ(global_state_table[type].type, type);
}
}
TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { TEST(ThreadListTest, SimpleColumnFamilyInfoTest) {
Env* env = Env::Default(); Env* env = Env::Default();
const int kHighPriorityThreads = 3; const int kHighPriorityThreads = 3;
const int kLowPriorityThreads = 5; const int kLowPriorityThreads = 5;
const int kSleepingHighPriThreads = kHighPriorityThreads - 1; const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
const int kSleepingLowPriThreads = kLowPriorityThreads / 3; const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
SleepingBackgroundTask sleeping_task( SimulatedBackgroundTask running_task(
reinterpret_cast<void*>(1234), "sleeping", reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu"); reinterpret_cast<void*>(5678), "pikachu");
for (int test = 0; test < kSleepingHighPriThreads; ++test) { for (int test = 0; test < kSimulatedHighPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask, env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&sleeping_task, Env::Priority::HIGH); &running_task, Env::Priority::HIGH);
} }
for (int test = 0; test < kSleepingLowPriThreads; ++test) { for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
env->Schedule(&SleepingBackgroundTask::DoSleepTask, env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&sleeping_task, Env::Priority::LOW); &running_task, Env::Priority::LOW);
} }
running_task.WaitUntilScheduled(
// make sure everything is scheduled. kSimulatedHighPriThreads + kSimulatedLowPriThreads, env);
env->SleepForMicroseconds(10000);
std::vector<ThreadStatus> thread_list; std::vector<ThreadStatus> thread_list;
// Verify the number of sleeping threads in each pool. // Verify the number of running threads in each pool.
env->GetThreadList(&thread_list); env->GetThreadList(&thread_list);
int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0}; int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0};
for (auto thread_status : thread_list) { for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" && if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") { thread_status.db_name == "running") {
sleeping_count[thread_status.thread_type]++; running_count[thread_status.thread_type]++;
} }
} }
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], running_count[ThreadStatus::HIGH_PRIORITY],
kSleepingHighPriThreads); kSimulatedHighPriThreads);
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], running_count[ThreadStatus::LOW_PRIORITY],
kSleepingLowPriThreads); kSimulatedLowPriThreads);
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0); running_count[ThreadStatus::USER], 0);
sleeping_task.WakeUp(); running_task.FinishAllTasks();
sleeping_task.WaitUntilDone(); running_task.WaitUntilDone();
// Verify none of the threads are sleeping // Verify none of the threads are running
env->GetThreadList(&thread_list); env->GetThreadList(&thread_list);
for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) {
sleeping_count[i] = 0;
}
for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) {
running_count[i] = 0;
}
for (auto thread_status : thread_list) { for (auto thread_status : thread_list) {
if (thread_status.cf_name == "pikachu" && if (thread_status.cf_name == "pikachu" &&
thread_status.db_name == "sleeping") { thread_status.db_name == "running") {
sleeping_count[thread_status.thread_type]++; running_count[thread_status.thread_type]++;
} }
} }
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY], 0); running_count[ThreadStatus::HIGH_PRIORITY], 0);
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY], 0); running_count[ThreadStatus::LOW_PRIORITY], 0);
ASSERT_EQ( ASSERT_EQ(
sleeping_count[ThreadStatus::ThreadType::USER_THREAD], 0); running_count[ThreadStatus::USER], 0);
}
namespace {
void UpdateStatusCounts(
const std::vector<ThreadStatus>& thread_list,
int operation_counts[], int state_counts[]) {
for (auto thread_status : thread_list) {
operation_counts[thread_status.operation_type]++;
state_counts[thread_status.state_type]++;
}
}
void VerifyAndResetCounts(
const int correct_counts[], int collected_counts[], int size) {
for (int i = 0; i < size; ++i) {
ASSERT_EQ(collected_counts[i], correct_counts[i]);
collected_counts[i] = 0;
}
}
void UpdateCount(
int operation_counts[], int from_event, int to_event, int amount) {
operation_counts[from_event] -= amount;
operation_counts[to_event] += amount;
}
} // namespace
TEST(ThreadListTest, SimpleEventTest) {
Env* env = Env::Default();
// simulated tasks
const int kFlushWriteTasks = 3;
SimulatedBackgroundTask flush_write_task(
reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu",
ThreadStatus::OP_FLUSH);
const int kCompactionWriteTasks = 4;
SimulatedBackgroundTask compaction_write_task(
reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu",
ThreadStatus::OP_COMPACTION);
const int kCompactionReadTasks = 5;
SimulatedBackgroundTask compaction_read_task(
reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu",
ThreadStatus::OP_COMPACTION);
const int kCompactionWaitTasks = 6;
SimulatedBackgroundTask compaction_wait_task(
reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu",
ThreadStatus::OP_COMPACTION);
// setup right answers
int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
correct_operation_counts[ThreadStatus::OP_FLUSH] =
kFlushWriteTasks;
correct_operation_counts[ThreadStatus::OP_COMPACTION] =
kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks;
env->SetBackgroundThreads(
correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH);
env->SetBackgroundThreads(
correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW);
// schedule the simulated tasks
for (int t = 0; t < kFlushWriteTasks; ++t) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&flush_write_task, Env::Priority::HIGH);
}
flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env);
for (int t = 0; t < kCompactionWriteTasks; ++t) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&compaction_write_task, Env::Priority::LOW);
}
compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env);
for (int t = 0; t < kCompactionReadTasks; ++t) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&compaction_read_task, Env::Priority::LOW);
}
compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env);
for (int t = 0; t < kCompactionWaitTasks; ++t) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&compaction_wait_task, Env::Priority::LOW);
}
compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env);
// verify the thread-status
int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0};
std::vector<ThreadStatus> thread_list;
env->GetThreadList(&thread_list);
UpdateStatusCounts(thread_list, operation_counts, state_counts);
VerifyAndResetCounts(correct_operation_counts, operation_counts,
ThreadStatus::NUM_OP_TYPES);
// terminate compaction-wait tasks and see if the thread-status
// reflects this update
compaction_wait_task.FinishAllTasks();
compaction_wait_task.WaitUntilDone();
UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks);
env->GetThreadList(&thread_list);
UpdateStatusCounts(thread_list, operation_counts, state_counts);
VerifyAndResetCounts(correct_operation_counts, operation_counts,
ThreadStatus::NUM_OP_TYPES);
// terminate flush-write tasks and see if the thread-status
// reflects this update
flush_write_task.FinishAllTasks();
flush_write_task.WaitUntilDone();
UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH,
ThreadStatus::OP_UNKNOWN, kFlushWriteTasks);
env->GetThreadList(&thread_list);
UpdateStatusCounts(thread_list, operation_counts, state_counts);
VerifyAndResetCounts(correct_operation_counts, operation_counts,
ThreadStatus::NUM_OP_TYPES);
// terminate compaction-write tasks and see if the thread-status
// reflects this update
compaction_write_task.FinishAllTasks();
compaction_write_task.WaitUntilDone();
UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks);
env->GetThreadList(&thread_list);
UpdateStatusCounts(thread_list, operation_counts, state_counts);
VerifyAndResetCounts(correct_operation_counts, operation_counts,
ThreadStatus::NUM_OP_TYPES);
// terminate compaction-write tasks and see if the thread-status
// reflects this update
compaction_read_task.FinishAllTasks();
compaction_read_task.WaitUntilDone();
UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
ThreadStatus::OP_UNKNOWN, kCompactionReadTasks);
env->GetThreadList(&thread_list);
UpdateStatusCounts(thread_list, operation_counts, state_counts);
VerifyAndResetCounts(correct_operation_counts, operation_counts,
ThreadStatus::NUM_OP_TYPES);
} }
} // namespace rocksdb } // namespace rocksdb

68
util/thread_operation.h Normal file
View File

@ -0,0 +1,68 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file defines the structures for thread operation and state.
// Thread operations are used to describe high level action of a
// thread such as doing compaction or flush, while thread state
// are used to describe lower-level action such as reading /
// writing a file or waiting for a mutex. Operations and states
// are designed to be independent. Typically, a thread usually involves
// in one operation and one state at any specific point in time.
#pragma once
#include "include/rocksdb/thread_status.h"
#include <string>
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
// The structure that describes a major thread operation.
struct OperationInfo {
const ThreadStatus::OperationType type;
const std::string name;
};
// The global operation table.
//
// When updating a status of a thread, the pointer of the OperationInfo
// of the current ThreadStatusData will be pointing to one of the
// rows in this global table.
//
// Note that it's not designed to be constant as in the future we
// might consider adding global count to the OperationInfo.
static OperationInfo global_operation_table[] = {
{ThreadStatus::OP_UNKNOWN, ""},
{ThreadStatus::OP_COMPACTION, "Compaction"},
{ThreadStatus::OP_FLUSH, "Flush"}
};
// The structure that describes a state.
struct StateInfo {
const ThreadStatus::StateType type;
const std::string name;
};
// The global state table.
//
// When updating a status of a thread, the pointer of the StateInfo
// of the current ThreadStatusData will be pointing to one of the
// rows in this global table.
static StateInfo global_state_table[] = {
{ThreadStatus::STATE_UNKNOWN, ""},
};
#else
struct OperationInfo {
};
struct StateInfo {
};
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb

View File

@ -34,10 +34,28 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey(
data->cf_key.store(cf_key, std::memory_order_relaxed); data->cf_key.store(cf_key, std::memory_order_relaxed);
} }
void ThreadStatusUpdater::SetEventInfoPtr( void ThreadStatusUpdater::SetThreadOperation(
const ThreadEventInfo* event_info) { const ThreadStatus::OperationType type) {
auto* data = InitAndGet(); auto* data = InitAndGet();
data->event_info.store(event_info, std::memory_order_relaxed); data->operation_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadOperation() {
auto* data = InitAndGet();
data->operation_type.store(
ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) {
auto* data = InitAndGet();
data->state_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadState() {
auto* data = InitAndGet();
data->state_type.store(
ThreadStatus::STATE_UNKNOWN, std::memory_order_relaxed);
} }
Status ThreadStatusUpdater::GetThreadList( Status ThreadStatusUpdater::GetThreadList(
@ -50,30 +68,35 @@ Status ThreadStatusUpdater::GetThreadList(
assert(thread_data); assert(thread_data);
auto thread_type = thread_data->thread_type.load( auto thread_type = thread_data->thread_type.load(
std::memory_order_relaxed); std::memory_order_relaxed);
// Since any change to cf_info_map requires thread_list_mutex,
// which is currently held by GetThreadList(), here we can safely
// use "memory_order_relaxed" to load the cf_key.
auto cf_key = thread_data->cf_key.load( auto cf_key = thread_data->cf_key.load(
std::memory_order_relaxed); std::memory_order_relaxed);
auto iter = cf_info_map_.find(cf_key); auto iter = cf_info_map_.find(cf_key);
assert(cf_key == 0 || iter != cf_info_map_.end()); assert(cf_key == 0 || iter != cf_info_map_.end());
auto* cf_info = iter != cf_info_map_.end() ? auto* cf_info = iter != cf_info_map_.end() ?
iter->second.get() : nullptr; iter->second.get() : nullptr;
auto* event_info = thread_data->event_info.load(
std::memory_order_relaxed);
const std::string* db_name = nullptr; const std::string* db_name = nullptr;
const std::string* cf_name = nullptr; const std::string* cf_name = nullptr;
const std::string* event_name = nullptr; ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
if (cf_info != nullptr) { if (cf_info != nullptr) {
db_name = &cf_info->db_name; db_name = &cf_info->db_name;
cf_name = &cf_info->cf_name; cf_name = &cf_info->cf_name;
op_type = thread_data->operation_type.load(
std::memory_order_relaxed);
// display lower-level info only when higher-level info is available. // display lower-level info only when higher-level info is available.
if (event_info != nullptr) { if (op_type != ThreadStatus::OP_UNKNOWN) {
event_name = &event_info->event_name; state_type = thread_data->state_type.load(
std::memory_order_relaxed);
} }
} }
thread_list->emplace_back( thread_list->emplace_back(
thread_data->thread_id, thread_type, thread_data->thread_id, thread_type,
db_name ? *db_name : "", db_name ? *db_name : "",
cf_name ? *cf_name : "", cf_name ? *cf_name : "",
event_name ? *event_name : ""); op_type, state_type);
} }
return Status::OK(); return Status::OK();
@ -93,6 +116,8 @@ ThreadStatusData* ThreadStatusUpdater::InitAndGet() {
void ThreadStatusUpdater::NewColumnFamilyInfo( void ThreadStatusUpdater::NewColumnFamilyInfo(
const void* db_key, const std::string& db_name, const void* db_key, const std::string& db_name,
const void* cf_key, const std::string& cf_name) { const void* cf_key, const std::string& cf_name) {
// Acquiring same lock as GetThreadList() to guarantee
// a consistent view of global column family table (cf_info_map).
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
cf_info_map_[cf_key].reset( cf_info_map_[cf_key].reset(
@ -101,6 +126,8 @@ void ThreadStatusUpdater::NewColumnFamilyInfo(
} }
void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
// Acquiring same lock as GetThreadList() to guarantee
// a consistent view of global column family table (cf_info_map).
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto cf_pair = cf_info_map_.find(cf_key); auto cf_pair = cf_info_map_.find(cf_key);
assert(cf_pair != cf_info_map_.end()); assert(cf_pair != cf_info_map_.end());
@ -122,6 +149,8 @@ void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
} }
void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
// Acquiring same lock as GetThreadList() to guarantee
// a consistent view of global column family table (cf_info_map).
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
auto db_pair = db_key_map_.find(db_key); auto db_pair = db_key_map_.find(db_key);
if (UNLIKELY(db_pair == db_key_map_.end())) { if (UNLIKELY(db_pair == db_key_map_.end())) {
@ -154,8 +183,18 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) { const void* cf_key) {
} }
void ThreadStatusUpdater::SetEventInfoPtr( void ThreadStatusUpdater::SetThreadOperation(
const ThreadEventInfo* event_info) { const ThreadStatus::OperationType type) {
}
void ThreadStatusUpdater::ClearThreadOperation() {
}
void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) {
}
void ThreadStatusUpdater::ClearThreadState() {
} }
Status ThreadStatusUpdater::GetThreadList( Status ThreadStatusUpdater::GetThreadList(

View File

@ -22,7 +22,7 @@
// should be ignored. // should be ignored.
// //
// The high to low level information would be: // The high to low level information would be:
// thread_id > thread_type > db > cf > event > event_count > event_details // thread_id > thread_type > db > cf > operation > state
// //
// This means user might not always get full information, but whenever // This means user might not always get full information, but whenever
// returned by the GetThreadList() is guaranteed to be consistent. // returned by the GetThreadList() is guaranteed to be consistent.
@ -37,6 +37,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/thread_status.h" #include "rocksdb/thread_status.h"
#include "port/port_posix.h" #include "port/port_posix.h"
#include "util/thread_operation.h"
namespace rocksdb { namespace rocksdb {
@ -57,27 +58,21 @@ struct ConstantColumnFamilyInfo {
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
// The structure that describes an event.
struct ThreadEventInfo {
#if ROCKSDB_USING_THREAD_STATUS
public:
const std::string event_name;
#endif // ROCKSDB_USING_THREAD_STATUS
};
// the internal data-structure that is used to reflect the current // the internal data-structure that is used to reflect the current
// status of a thread using a set of atomic pointers. // status of a thread using a set of atomic pointers.
struct ThreadStatusData { struct ThreadStatusData {
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
explicit ThreadStatusData() : thread_id(0) { explicit ThreadStatusData() : thread_id(0) {
thread_type.store(ThreadStatus::ThreadType::USER_THREAD); thread_type.store(ThreadStatus::USER);
cf_key.store(0); cf_key.store(0);
event_info.store(nullptr); operation_type.store(ThreadStatus::OP_UNKNOWN);
state_type.store(ThreadStatus::STATE_UNKNOWN);
} }
uint64_t thread_id; uint64_t thread_id;
std::atomic<ThreadStatus::ThreadType> thread_type; std::atomic<ThreadStatus::ThreadType> thread_type;
std::atomic<const void*> cf_key; std::atomic<const void*> cf_key;
std::atomic<const ThreadEventInfo*> event_info; std::atomic<ThreadStatus::OperationType> operation_type;
std::atomic<ThreadStatus::StateType> state_type;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
}; };
@ -103,12 +98,20 @@ class ThreadStatusUpdater {
void SetThreadType(ThreadStatus::ThreadType ttype); void SetThreadType(ThreadStatus::ThreadType ttype);
// Update the column-family info of the current thread by setting // Update the column-family info of the current thread by setting
// its thread-local pointer of ThreadEventInfo to the correct entry. // its thread-local pointer of ThreadStateInfo to the correct entry.
void SetColumnFamilyInfoKey(const void* cf_key); void SetColumnFamilyInfoKey(const void* cf_key);
// Update the event info of the current thread by setting // Update the thread operation of the current thread.
// its thread-local pointer of ThreadEventInfo to the correct entry. void SetThreadOperation(const ThreadStatus::OperationType type);
void SetEventInfoPtr(const ThreadEventInfo* event_info);
// Clear thread operation of the current thread.
void ClearThreadOperation();
// Update the thread state of the current thread.
void SetThreadState(const ThreadStatus::StateType type);
// Clear the thread state of the current thread.
void ClearThreadState();
// Obtain the status of all active registered threads. // Obtain the status of all active registered threads.
Status GetThreadList( Status GetThreadList(