From 9758c9dfd744f252bf3351c1a212e05c9f7fc857 Mon Sep 17 00:00:00 2001 From: Matt Callanan Date: Mon, 26 Sep 2022 10:23:33 -0700 Subject: [PATCH] Add `snappy::CompressFromIOVec`. This reads from an `iovec` array rather than from a `char` array as in `snappy::Compress`. PiperOrigin-RevId: 476930623 --- snappy.cc | 92 ++++++++++++++++++++++++++++++++++++++++++++- snappy.h | 17 ++++++++- snappy_benchmark.cc | 52 ++++++++++++++++++++++++- snappy_unittest.cc | 80 ++++++++++++++++++++++++++++++--------- 4 files changed, 218 insertions(+), 23 deletions(-) diff --git a/snappy.cc b/snappy.cc index 6502cfd..5457377 100644 --- a/snappy.cc +++ b/snappy.cc @@ -1580,6 +1580,67 @@ size_t Compress(Source* reader, Sink* writer) { // IOVec interfaces // ----------------------------------------------------------------------- +// A `Source` implementation that yields the contents of an `iovec` array. Note +// that `total_size` is the total number of bytes to be read from the elements +// of `iov` (_not_ the total number of elements in `iov`). +class SnappyIOVecReader : public Source { + public: + SnappyIOVecReader(const struct iovec* iov, size_t total_size) + : curr_iov_(iov), + curr_pos_(total_size > 0 ? reinterpret_cast(iov->iov_base) + : nullptr), + curr_size_remaining_(total_size > 0 ? iov->iov_len : 0), + total_size_remaining_(total_size) { + // Skip empty leading `iovec`s. + if (total_size > 0 && curr_size_remaining_ == 0) Advance(); + } + + ~SnappyIOVecReader() = default; + + size_t Available() const { return total_size_remaining_; } + + const char* Peek(size_t* len) { + *len = curr_size_remaining_; + return curr_pos_; + } + + void Skip(size_t n) { + while (n >= curr_size_remaining_ && n > 0) { + n -= curr_size_remaining_; + Advance(); + } + curr_size_remaining_ -= n; + total_size_remaining_ -= n; + curr_pos_ += n; + } + + private: + // Advances to the next nonempty `iovec` and updates related variables. + void Advance() { + do { + assert(total_size_remaining_ >= curr_size_remaining_); + total_size_remaining_ -= curr_size_remaining_; + if (total_size_remaining_ == 0) { + curr_pos_ = nullptr; + curr_size_remaining_ = 0; + return; + } + ++curr_iov_; + curr_pos_ = reinterpret_cast(curr_iov_->iov_base); + curr_size_remaining_ = curr_iov_->iov_len; + } while (curr_size_remaining_ == 0); + } + + // The `iovec` currently being read. + const struct iovec* curr_iov_; + // The location in `curr_iov_` currently being read. + const char* curr_pos_; + // The amount of unread data in `curr_iov_`. + size_t curr_size_remaining_; + // The amount of unread data in the entire input array. + size_t total_size_remaining_; +}; + // A type that writes to an iovec. // Note that this is not a "ByteSink", but a type that matches the // Writer template argument to SnappyDecompressor::DecompressAllTags(). @@ -1954,6 +2015,16 @@ void RawCompress(const char* input, size_t input_length, char* compressed, *compressed_length = (writer.CurrentDestination() - compressed); } +void RawCompressFromIOVec(const struct iovec* iov, size_t uncompressed_length, + char* compressed, size_t* compressed_length) { + SnappyIOVecReader reader(iov, uncompressed_length); + UncheckedByteArraySink writer(compressed); + Compress(&reader, &writer); + + // Compute how many bytes were added. + *compressed_length = writer.CurrentDestination() - compressed; +} + size_t Compress(const char* input, size_t input_length, std::string* compressed) { // Pre-grow the buffer to the max length of the compressed output @@ -1962,7 +2033,26 @@ size_t Compress(const char* input, size_t input_length, size_t compressed_length; RawCompress(input, input_length, string_as_array(compressed), &compressed_length); - compressed->resize(compressed_length); + compressed->erase(compressed_length); + return compressed_length; +} + +size_t CompressFromIOVec(const struct iovec* iov, size_t iov_cnt, + std::string* compressed) { + // Compute the number of bytes to be compressed. + size_t uncompressed_length = 0; + for (int i = 0; i < iov_cnt; ++i) { + uncompressed_length += iov[i].iov_len; + } + + // Pre-grow the buffer to the max length of the compressed output. + STLStringResizeUninitialized(compressed, MaxCompressedLength( + uncompressed_length)); + + size_t compressed_length; + RawCompressFromIOVec(iov, uncompressed_length, string_as_array(compressed), + &compressed_length); + compressed->erase(compressed_length); return compressed_length; } diff --git a/snappy.h b/snappy.h index e4fdad3..e12b658 100644 --- a/snappy.h +++ b/snappy.h @@ -71,14 +71,21 @@ namespace snappy { // Higher-level string based routines (should be sufficient for most users) // ------------------------------------------------------------------------ - // Sets "*compressed" to the compressed version of "input[0,input_length-1]". + // Sets "*compressed" to the compressed version of "input[0..input_length-1]". // Original contents of *compressed are lost. // // REQUIRES: "input[]" is not an alias of "*compressed". size_t Compress(const char* input, size_t input_length, std::string* compressed); - // Decompresses "compressed[0,compressed_length-1]" to "*uncompressed". + // Same as `Compress` above but taking an `iovec` array as input. Note that + // this function preprocesses the inputs to compute the sum of + // `iov[0..iov_cnt-1].iov_len` before reading. To avoid this, use + // `RawCompressFromIOVec` below. + size_t CompressFromIOVec(const struct iovec* iov, size_t iov_cnt, + std::string* compressed); + + // Decompresses "compressed[0..compressed_length-1]" to "*uncompressed". // Original contents of "*uncompressed" are lost. // // REQUIRES: "compressed[]" is not an alias of "*uncompressed". @@ -124,6 +131,12 @@ namespace snappy { char* compressed, size_t* compressed_length); + // Same as `RawCompress` above but taking an `iovec` array as input. Note that + // `uncompressed_length` is the total number of bytes to be read from the + // elements of `iov` (_not_ the number of elements in `iov`). + void RawCompressFromIOVec(const struct iovec* iov, size_t uncompressed_length, + char* compressed, size_t* compressed_length); + // Given data in "compressed[0..compressed_length-1]" generated by // calling the Snappy::Compress routine, this routine // stores the uncompressed data to diff --git a/snappy_benchmark.cc b/snappy_benchmark.cc index 9a54f9c..0590142 100644 --- a/snappy_benchmark.cc +++ b/snappy_benchmark.cc @@ -149,7 +149,55 @@ void BM_UValidateMedley(benchmark::State& state) { } BENCHMARK(BM_UValidateMedley); -void BM_UIOVec(benchmark::State& state) { +void BM_UIOVecSource(benchmark::State& state) { + // Pick file to process based on state.range(0). + int file_index = state.range(0); + + CHECK_GE(file_index, 0); + CHECK_LT(file_index, ARRAYSIZE(kTestDataFiles)); + std::string contents = + ReadTestDataFile(kTestDataFiles[file_index].filename, + kTestDataFiles[file_index].size_limit); + + // Create `iovec`s of the `contents`. + const int kNumEntries = 10; + struct iovec iov[kNumEntries]; + size_t used_so_far = 0; + for (int i = 0; i < kNumEntries; ++i) { + iov[i].iov_base = contents.data() + used_so_far; + if (used_so_far == contents.size()) { + iov[i].iov_len = 0; + continue; + } + if (i == kNumEntries - 1) { + iov[i].iov_len = contents.size() - used_so_far; + } else { + iov[i].iov_len = contents.size() / kNumEntries; + } + used_so_far += iov[i].iov_len; + } + + char* dst = new char[snappy::MaxCompressedLength(contents.size())]; + size_t zsize = 0; + for (auto s : state) { + snappy::RawCompressFromIOVec(iov, contents.size(), dst, &zsize); + benchmark::DoNotOptimize(iov); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(contents.size())); + const double compression_ratio = + static_cast(zsize) / std::max(1, contents.size()); + state.SetLabel(StrFormat("%s (%.2f %%)", kTestDataFiles[file_index].label, + 100.0 * compression_ratio)); + VLOG(0) << StrFormat("compression for %s: %d -> %d bytes", + kTestDataFiles[file_index].label, contents.size(), + zsize); + + delete[] dst; +} +BENCHMARK(BM_UIOVecSource)->DenseRange(0, ARRAYSIZE(kTestDataFiles) - 1); + +void BM_UIOVecSink(benchmark::State& state) { // Pick file to process based on state.range(0). int file_index = state.range(0); @@ -193,7 +241,7 @@ void BM_UIOVec(benchmark::State& state) { delete[] dst; } -BENCHMARK(BM_UIOVec)->DenseRange(0, 4); +BENCHMARK(BM_UIOVecSink)->DenseRange(0, 4); void BM_UFlatSink(benchmark::State& state) { // Pick file to process based on state.range(0). diff --git a/snappy_unittest.cc b/snappy_unittest.cc index 292004c..aeb8044 100644 --- a/snappy_unittest.cc +++ b/snappy_unittest.cc @@ -137,21 +137,10 @@ void VerifyStringSink(const std::string& input) { CHECK_EQ(uncompressed, input); } -void VerifyIOVec(const std::string& input) { - std::string compressed; - DataEndingAtUnreadablePage i(input); - const size_t written = snappy::Compress(i.data(), i.size(), &compressed); - CHECK_EQ(written, compressed.size()); - CHECK_LE(compressed.size(), - snappy::MaxCompressedLength(input.size())); - CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size())); - - // Try uncompressing into an iovec containing a random number of entries - // ranging from 1 to 10. - char* buf = new char[input.size()]; +struct iovec* GetIOVec(const std::string& input, char*& buf, size_t& num) { std::minstd_rand0 rng(input.size()); std::uniform_int_distribution uniform_1_to_10(1, 10); - size_t num = uniform_1_to_10(rng); + num = uniform_1_to_10(rng); if (input.size() < num) { num = input.size(); } @@ -175,8 +164,40 @@ void VerifyIOVec(const std::string& input) { } used_so_far += iov[i].iov_len; } - CHECK(snappy::RawUncompressToIOVec( - compressed.data(), compressed.size(), iov, num)); + return iov; +} + +int VerifyIOVecSource(const std::string& input) { + std::string compressed; + std::string copy = input; + char* buf = copy.data(); + size_t num = 0; + struct iovec* iov = GetIOVec(input, buf, num); + const size_t written = snappy::CompressFromIOVec(iov, num, &compressed); + CHECK_EQ(written, compressed.size()); + CHECK_LE(compressed.size(), snappy::MaxCompressedLength(input.size())); + CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size())); + + std::string uncompressed; + DataEndingAtUnreadablePage c(compressed); + CHECK(snappy::Uncompress(c.data(), c.size(), &uncompressed)); + CHECK_EQ(uncompressed, input); + delete[] iov; + return uncompressed.size(); +} + +void VerifyIOVecSink(const std::string& input) { + std::string compressed; + DataEndingAtUnreadablePage i(input); + const size_t written = snappy::Compress(i.data(), i.size(), &compressed); + CHECK_EQ(written, compressed.size()); + CHECK_LE(compressed.size(), snappy::MaxCompressedLength(input.size())); + CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size())); + char* buf = new char[input.size()]; + size_t num = 0; + struct iovec* iov = GetIOVec(input, buf, num); + CHECK(snappy::RawUncompressToIOVec(compressed.data(), compressed.size(), iov, + num)); CHECK(!memcmp(buf, input.data(), input.size())); delete[] iov; delete[] buf; @@ -252,15 +273,18 @@ int Verify(const std::string& input) { // Compress using string based routines const int result = VerifyString(input); + // Compress using `iovec`-based routines. + CHECK_EQ(VerifyIOVecSource(input), result); + // Verify using sink based routines VerifyStringSink(input); VerifyNonBlockedCompression(input); - VerifyIOVec(input); + VerifyIOVecSink(input); if (!input.empty()) { const std::string expanded = Expand(input); VerifyNonBlockedCompression(expanded); - VerifyIOVec(input); + VerifyIOVecSink(input); } return result; @@ -540,7 +564,27 @@ TEST(Snappy, FourByteOffset) { CHECK_EQ(uncompressed, src); } -TEST(Snappy, IOVecEdgeCases) { +TEST(Snappy, IOVecSourceEdgeCases) { + // Validate that empty leading, trailing, and in-between iovecs are handled: + // [] [] ['a'] [] ['b'] []. + std::string data = "ab"; + char* buf = data.data(); + size_t used_so_far = 0; + static const int kLengths[] = {0, 0, 1, 0, 1, 0}; + struct iovec iov[ARRAYSIZE(kLengths)]; + for (int i = 0; i < ARRAYSIZE(kLengths); ++i) { + iov[i].iov_base = buf + used_so_far; + iov[i].iov_len = kLengths[i]; + used_so_far += kLengths[i]; + } + std::string compressed; + snappy::CompressFromIOVec(iov, ARRAYSIZE(kLengths), &compressed); + std::string uncompressed; + snappy::Uncompress(compressed.data(), compressed.size(), &uncompressed); + CHECK_EQ(data, uncompressed); +} + +TEST(Snappy, IOVecSinkEdgeCases) { // Test some tricky edge cases in the iovec output that are not necessarily // exercised by random tests.