From cd0980150bd0614a326302f534c23e585750b370 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 13 Nov 2014 16:34:29 -0500 Subject: [PATCH] Add concurrency to compacting SpatialDB Summary: This will speed up our import times Test Plan: Added simple unit test just to get code coverage Reviewers: sdong, ljin, yhchiang, rven, mohaps Reviewed By: mohaps Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D28869 --- include/rocksdb/utilities/spatial_db.h | 4 +- utilities/spatialdb/spatial_db.cc | 61 ++++++++++++++++++-------- utilities/spatialdb/spatial_db_test.cc | 5 ++- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/include/rocksdb/utilities/spatial_db.h b/include/rocksdb/utilities/spatial_db.h index cba93cd5fa..1beb5c7f18 100644 --- a/include/rocksdb/utilities/spatial_db.h +++ b/include/rocksdb/utilities/spatial_db.h @@ -222,7 +222,9 @@ class SpatialDB : public StackableDB { // Calling Compact() after inserting a bunch of elements should speed up // reading. This is especially useful if you use SpatialDBOptions::bulk_load - virtual Status Compact() = 0; + // Num threads determines how many threads we'll use for compactions. Setting + // this to bigger number will use more IO and CPU, but finish faster + virtual Status Compact(int num_threads = 1) = 0; // Query the specified spatial_index. Query will return all elements that // intersect bbox, but it may also return some extra elements. diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index 6fbb780bc1..2a4f7b14e2 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -11,10 +11,13 @@ #define __STDC_FORMAT_MACROS #endif +#include +#include #include #include #include -#include +#include +#include #include #include @@ -561,27 +564,49 @@ class SpatialDBImpl : public SpatialDB { return Write(write_options, &batch); } - virtual Status Compact() override { - // TODO(icanadi) maybe do this in parallel? - Status s, t; + virtual Status Compact(int num_threads) override { + std::vector column_families; + column_families.push_back(data_column_family_); + for (auto& iter : name_to_index_) { - t = Flush(FlushOptions(), iter.second.column_family); - if (!t.ok()) { - s = t; - } - t = CompactRange(iter.second.column_family, nullptr, nullptr); - if (!t.ok()) { - s = t; - } + column_families.push_back(iter.second.column_family); } - t = Flush(FlushOptions(), data_column_family_); - if (!t.ok()) { - s = t; + + std::mutex state_mutex; + std::condition_variable cv; + Status s; + int threads_running = 0; + + std::vector threads; + + for (auto cfh : column_families) { + threads.emplace_back([&, cfh] { + { + std::unique_lock lk(state_mutex); + cv.wait(lk, [&] { return threads_running < num_threads; }); + threads_running++; + } + + Status t = Flush(FlushOptions(), cfh); + if (t.ok()) { + t = CompactRange(cfh, nullptr, nullptr); + } + + { + std::unique_lock lk(state_mutex); + threads_running--; + if (s.ok() && !t.ok()) { + s = t; + } + cv.notify_one(); + } + }); } - t = CompactRange(data_column_family_, nullptr, nullptr); - if (!t.ok()) { - s = t; + + for (auto& t : threads) { + t.join(); } + return s; } diff --git a/utilities/spatialdb/spatial_db_test.cc b/utilities/spatialdb/spatial_db_test.cc index 166920b57d..0484f8c02b 100644 --- a/utilities/spatialdb/spatial_db_test.cc +++ b/utilities/spatialdb/spatial_db_test.cc @@ -245,7 +245,10 @@ TEST(SpatialDBTest, RandomizedTest) { elements.push_back(make_pair(blob, bbox)); } - db_->Compact(); + // parallel + db_->Compact(2); + // serial + db_->Compact(1); for (int i = 0; i < 1000; ++i) { BoundingBox int_bbox = RandomBoundingBox(128, &rnd, 10);