Add support for uncompressing to iovecs (scatter I/O).

Windows does not have struct iovec defined anywhere,
so we define our own version that's equal to what UNIX
typically has.

The bulk of this patch was contributed by Mohit Aron.

R=jeff


git-svn-id: https://snappy.googlecode.com/svn/trunk@76 03e5f5b5-db94-4691-08a0-1a8bf15f6143
This commit is contained in:
snappy.mirrorbot@gmail.com 2013-06-13 16:19:52 +00:00
parent cd92eb0852
commit 328aafa198
6 changed files with 422 additions and 2 deletions

View File

@ -110,6 +110,11 @@ if test "$ac_cv_header_stddef_h" = "yes"; then
else else
AC_SUBST([ac_cv_have_stddef_h], [0]) AC_SUBST([ac_cv_have_stddef_h], [0])
fi fi
if test "$ac_cv_header_sys_uio_h" = "yes"; then
AC_SUBST([ac_cv_have_sys_uio_h], [1])
else
AC_SUBST([ac_cv_have_sys_uio_h], [0])
fi
# Export the version to snappy-stubs-public.h. # Export the version to snappy-stubs-public.h.
SNAPPY_MAJOR="snappy_major" SNAPPY_MAJOR="snappy_major"

View File

@ -44,6 +44,10 @@
#include <stddef.h> #include <stddef.h>
#endif #endif
#if @ac_cv_have_sys_uio_h@
#include <sys/uio.h>
#endif
#define SNAPPY_MAJOR @SNAPPY_MAJOR@ #define SNAPPY_MAJOR @SNAPPY_MAJOR@
#define SNAPPY_MINOR @SNAPPY_MINOR@ #define SNAPPY_MINOR @SNAPPY_MINOR@
#define SNAPPY_PATCHLEVEL @SNAPPY_PATCHLEVEL@ #define SNAPPY_PATCHLEVEL @SNAPPY_PATCHLEVEL@
@ -80,6 +84,15 @@ typedef std::string string;
TypeName(const TypeName&); \ TypeName(const TypeName&); \
void operator=(const TypeName&) void operator=(const TypeName&)
#if !@ac_cv_have_sys_uio_h@
// Windows does not have an iovec type, yet the concept is universally useful.
// It is simple to define it ourselves, so we put it inside our own namespace.
struct iovec {
void* iov_base;
size_t iov_len;
};
#endif
} // namespace snappy } // namespace snappy
#endif // UTIL_SNAPPY_OPENSOURCE_SNAPPY_STUBS_PUBLIC_H_ #endif // UTIL_SNAPPY_OPENSOURCE_SNAPPY_STUBS_PUBLIC_H_

View File

@ -328,6 +328,7 @@ class Benchmark {
(new Benchmark(#benchmark_name, benchmark_name)) (new Benchmark(#benchmark_name, benchmark_name))
extern Benchmark* Benchmark_BM_UFlat; extern Benchmark* Benchmark_BM_UFlat;
extern Benchmark* Benchmark_BM_UIOVec;
extern Benchmark* Benchmark_BM_UValidate; extern Benchmark* Benchmark_BM_UValidate;
extern Benchmark* Benchmark_BM_ZFlat; extern Benchmark* Benchmark_BM_ZFlat;
@ -478,6 +479,7 @@ static void RunSpecifiedBenchmarks() {
fprintf(stderr, "---------------------------------------------------\n"); fprintf(stderr, "---------------------------------------------------\n");
snappy::Benchmark_BM_UFlat->Run(); snappy::Benchmark_BM_UFlat->Run();
snappy::Benchmark_BM_UIOVec->Run();
snappy::Benchmark_BM_UValidate->Run(); snappy::Benchmark_BM_UValidate->Run();
snappy::Benchmark_BM_ZFlat->Run(); snappy::Benchmark_BM_ZFlat->Run();

177
snappy.cc
View File

@ -944,6 +944,183 @@ size_t Compress(Source* reader, Sink* writer) {
return written; return written;
} }
// -----------------------------------------------------------------------
// IOVec interfaces
// -----------------------------------------------------------------------
// A type that writes to an iovec.
// Note that this is not a "ByteSink", but a type that matches the
// Writer template argument to SnappyDecompressor::DecompressAllTags().
class SnappyIOVecWriter {
private:
const struct iovec* output_iov_;
const size_t output_iov_count_;
// We are currently writing into output_iov_[curr_iov_index_].
int curr_iov_index_;
// Bytes written to output_iov_[curr_iov_index_] so far.
size_t curr_iov_written_;
// Total bytes decompressed into output_iov_ so far.
size_t total_written_;
// Maximum number of bytes that will be decompressed into output_iov_.
size_t output_limit_;
inline char* GetIOVecPointer(int index, size_t offset) {
return reinterpret_cast<char*>(output_iov_[index].iov_base) +
offset;
}
public:
// Does not take ownership of iov. iov must be valid during the
// entire lifetime of the SnappyIOVecWriter.
inline SnappyIOVecWriter(const struct iovec* iov, size_t iov_count)
: output_iov_(iov),
output_iov_count_(iov_count),
curr_iov_index_(0),
curr_iov_written_(0),
total_written_(0),
output_limit_(-1) {
}
inline void SetExpectedLength(size_t len) {
output_limit_ = len;
}
inline bool CheckLength() const {
return total_written_ == output_limit_;
}
inline bool Append(const char* ip, size_t len) {
if (total_written_ + len > output_limit_) {
return false;
}
while (len > 0) {
assert(curr_iov_written_ <= output_iov_[curr_iov_index_].iov_len);
if (curr_iov_written_ >= output_iov_[curr_iov_index_].iov_len) {
// This iovec is full. Go to the next one.
if (curr_iov_index_ + 1 >= output_iov_count_) {
return false;
}
curr_iov_written_ = 0;
++curr_iov_index_;
}
const size_t to_write = std::min(
len, output_iov_[curr_iov_index_].iov_len - curr_iov_written_);
memcpy(GetIOVecPointer(curr_iov_index_, curr_iov_written_),
ip,
to_write);
curr_iov_written_ += to_write;
total_written_ += to_write;
ip += to_write;
len -= to_write;
}
return true;
}
inline bool TryFastAppend(const char* ip, size_t available, size_t len) {
const size_t space_left = output_limit_ - total_written_;
if (len <= 16 && available >= 16 && space_left >= 16 &&
output_iov_[curr_iov_index_].iov_len - curr_iov_written_ >= 16) {
// Fast path, used for the majority (about 95%) of invocations.
char* ptr = GetIOVecPointer(curr_iov_index_, curr_iov_written_);
UnalignedCopy64(ip, ptr);
UnalignedCopy64(ip + 8, ptr + 8);
curr_iov_written_ += len;
total_written_ += len;
return true;
}
return false;
}
inline bool AppendFromSelf(size_t offset, size_t len) {
if (offset > total_written_ || offset == 0) {
return false;
}
const size_t space_left = output_limit_ - total_written_;
if (len > space_left) {
return false;
}
// Locate the iovec from which we need to start the copy.
int from_iov_index = curr_iov_index_;
size_t from_iov_offset = curr_iov_written_;
while (offset > 0) {
if (from_iov_offset >= offset) {
from_iov_offset -= offset;
break;
}
offset -= from_iov_offset;
--from_iov_index;
assert(from_iov_index >= 0);
from_iov_offset = output_iov_[from_iov_index].iov_len;
}
// Copy <len> bytes starting from the iovec pointed to by from_iov_index to
// the current iovec.
while (len > 0) {
assert(from_iov_index <= curr_iov_index_);
if (from_iov_index != curr_iov_index_) {
const size_t to_copy = std::min(
output_iov_[from_iov_index].iov_len - from_iov_offset,
len);
Append(GetIOVecPointer(from_iov_index, from_iov_offset), to_copy);
len -= to_copy;
if (len > 0) {
++from_iov_index;
from_iov_offset = 0;
}
} else {
assert(curr_iov_written_ <= output_iov_[curr_iov_index_].iov_len);
size_t to_copy = std::min(output_iov_[curr_iov_index_].iov_len -
curr_iov_written_,
len);
if (to_copy == 0) {
// This iovec is full. Go to the next one.
if (curr_iov_index_ + 1 >= output_iov_count_) {
return false;
}
++curr_iov_index_;
curr_iov_written_ = 0;
continue;
}
if (to_copy > len) {
to_copy = len;
}
IncrementalCopy(GetIOVecPointer(from_iov_index, from_iov_offset),
GetIOVecPointer(curr_iov_index_, curr_iov_written_),
to_copy);
curr_iov_written_ += to_copy;
from_iov_offset += to_copy;
total_written_ += to_copy;
len -= to_copy;
}
}
return true;
}
};
bool RawUncompressToIOVec(const char* compressed, size_t compressed_length,
const struct iovec* iov, size_t iov_cnt) {
ByteArraySource reader(compressed, compressed_length);
return RawUncompressToIOVec(&reader, iov, iov_cnt);
}
bool RawUncompressToIOVec(Source* compressed, const struct iovec* iov,
size_t iov_cnt) {
SnappyIOVecWriter output(iov, iov_cnt);
return InternalUncompress(compressed, &output);
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// Flat array interfaces // Flat array interfaces
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------

View File

@ -124,6 +124,28 @@ namespace snappy {
// returns false if the message is corrupted and could not be decrypted // returns false if the message is corrupted and could not be decrypted
bool RawUncompress(Source* compressed, char* uncompressed); bool RawUncompress(Source* compressed, char* uncompressed);
// Given data in "compressed[0..compressed_length-1]" generated by
// calling the Snappy::Compress routine, this routine
// stores the uncompressed data to the iovec "iov". The number of physical
// buffers in "iov" is given by iov_cnt and their cumulative size
// must be at least GetUncompressedLength(compressed). The individual buffers
// in "iov" must not overlap with each other.
//
// returns false if the message is corrupted and could not be decrypted
bool RawUncompressToIOVec(const char* compressed, size_t compressed_length,
const struct iovec* iov, size_t iov_cnt);
// Given data from the byte source 'compressed' generated by calling
// the Snappy::Compress routine, this routine stores the uncompressed
// data to the iovec "iov". The number of physical
// buffers in "iov" is given by iov_cnt and their cumulative size
// must be at least GetUncompressedLength(compressed). The individual buffers
// in "iov" must not overlap with each other.
//
// returns false if the message is corrupted and could not be decrypted
bool RawUncompressToIOVec(Source* compressed, const struct iovec* iov,
size_t iov_cnt);
// Returns the maximal size of the compressed representation of // Returns the maximal size of the compressed representation of
// input data that is "source_bytes" bytes in length; // input data that is "source_bytes" bytes in length;
size_t MaxCompressedLength(size_t source_bytes); size_t MaxCompressedLength(size_t source_bytes);

View File

@ -492,6 +492,46 @@ static int VerifyString(const string& input) {
} }
static void VerifyIOVec(const string& input) {
string compressed;
DataEndingAtUnreadablePage i(input);
const size_t written = snappy::Compress(i.data(), i.size(), &compressed);
CHECK_EQ(written, compressed.size());
CHECK_LE(compressed.size(),
snappy::MaxCompressedLength(input.size()));
CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size()));
// Try uncompressing into an iovec containing a random number of entries
// ranging from 1 to 10.
char* buf = new char[input.size()];
ACMRandom rnd(input.size());
int num = rnd.Next() % 10 + 1;
if (input.size() < num) {
num = input.size();
}
struct iovec* iov = new iovec[num];
int used_so_far = 0;
for (int i = 0; i < num; ++i) {
iov[i].iov_base = buf + used_so_far;
if (i == num - 1) {
iov[i].iov_len = input.size() - used_so_far;
} else {
// Randomly choose to insert a 0 byte entry.
if (rnd.OneIn(5)) {
iov[i].iov_len = 0;
} else {
iov[i].iov_len = rnd.Uniform(input.size());
}
}
used_so_far += iov[i].iov_len;
}
CHECK(snappy::RawUncompressToIOVec(
compressed.data(), compressed.size(), iov, num));
CHECK(!memcmp(buf, input.data(), input.size()));
delete[] iov;
delete[] buf;
}
// Test that data compressed by a compressor that does not // Test that data compressed by a compressor that does not
// obey block sizes is uncompressed properly. // obey block sizes is uncompressed properly.
static void VerifyNonBlockedCompression(const string& input) { static void VerifyNonBlockedCompression(const string& input) {
@ -542,8 +582,11 @@ static int Verify(const string& input) {
VerifyNonBlockedCompression(input); VerifyNonBlockedCompression(input);
VerifyIOVec(input);
if (!input.empty()) { if (!input.empty()) {
VerifyNonBlockedCompression(Expand(input)); const string expanded = Expand(input);
VerifyNonBlockedCompression(expanded);
VerifyIOVec(input);
} }
@ -664,7 +707,7 @@ static void AppendCopy(string* dst, int offset, int length) {
} }
length -= to_copy; length -= to_copy;
if ((to_copy < 12) && (offset < 2048)) { if ((to_copy >= 4) && (to_copy < 12) && (offset < 2048)) {
assert(to_copy-4 < 8); // Must fit in 3 bits assert(to_copy-4 < 8); // Must fit in 3 bits
dst->push_back(1 | ((to_copy-4) << 2) | ((offset >> 8) << 5)); dst->push_back(1 | ((to_copy-4) << 2) | ((offset >> 8) << 5));
dst->push_back(offset & 0xff); dst->push_back(offset & 0xff);
@ -774,6 +817,118 @@ TEST(Snappy, FourByteOffset) {
CHECK_EQ(uncompressed, src); CHECK_EQ(uncompressed, src);
} }
TEST(Snappy, IOVecEdgeCases) {
// Test some tricky edge cases in the iovec output that are not necessarily
// exercised by random tests.
// Our output blocks look like this initially (the last iovec is bigger
// than depicted):
// [ ] [ ] [ ] [ ] [ ]
static const int kLengths[] = { 2, 1, 4, 8, 128 };
struct iovec iov[ARRAYSIZE(kLengths)];
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
iov[i].iov_base = new char[kLengths[i]];
iov[i].iov_len = kLengths[i];
}
string compressed;
Varint::Append32(&compressed, 22);
// A literal whose output crosses three blocks.
// [ab] [c] [123 ] [ ] [ ]
AppendLiteral(&compressed, "abc123");
// A copy whose output crosses two blocks (source and destination
// segments marked).
// [ab] [c] [1231] [23 ] [ ]
// ^--^ --
AppendCopy(&compressed, 3, 3);
// A copy where the input is, at first, in the block before the output:
//
// [ab] [c] [1231] [231231 ] [ ]
// ^--- ^---
// Then during the copy, the pointers move such that the input and
// output pointers are in the same block:
//
// [ab] [c] [1231] [23123123] [ ]
// ^- ^-
// And then they move again, so that the output pointer is no longer
// in the same block as the input pointer:
// [ab] [c] [1231] [23123123] [123 ]
// ^-- ^--
AppendCopy(&compressed, 6, 9);
// Finally, a copy where the input is from several blocks back,
// and it also crosses three blocks:
//
// [ab] [c] [1231] [23123123] [123b ]
// ^ ^
// [ab] [c] [1231] [23123123] [123bc ]
// ^ ^
// [ab] [c] [1231] [23123123] [123bc12 ]
// ^- ^-
AppendCopy(&compressed, 17, 4);
CHECK(snappy::RawUncompressToIOVec(
compressed.data(), compressed.size(), iov, ARRAYSIZE(iov)));
CHECK_EQ(0, memcmp(iov[0].iov_base, "ab", 2));
CHECK_EQ(0, memcmp(iov[1].iov_base, "c", 1));
CHECK_EQ(0, memcmp(iov[2].iov_base, "1231", 4));
CHECK_EQ(0, memcmp(iov[3].iov_base, "23123123", 8));
CHECK_EQ(0, memcmp(iov[4].iov_base, "123bc12", 7));
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
delete[] reinterpret_cast<char *>(iov[i].iov_base);
}
}
TEST(Snappy, IOVecLiteralOverflow) {
static const int kLengths[] = { 3, 4 };
struct iovec iov[ARRAYSIZE(kLengths)];
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
iov[i].iov_base = new char[kLengths[i]];
iov[i].iov_len = kLengths[i];
}
string compressed;
Varint::Append32(&compressed, 8);
AppendLiteral(&compressed, "12345678");
CHECK(!snappy::RawUncompressToIOVec(
compressed.data(), compressed.size(), iov, ARRAYSIZE(iov)));
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
delete[] reinterpret_cast<char *>(iov[i].iov_base);
}
}
TEST(Snappy, IOVecCopyOverflow) {
static const int kLengths[] = { 3, 4 };
struct iovec iov[ARRAYSIZE(kLengths)];
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
iov[i].iov_base = new char[kLengths[i]];
iov[i].iov_len = kLengths[i];
}
string compressed;
Varint::Append32(&compressed, 8);
AppendLiteral(&compressed, "123");
AppendCopy(&compressed, 3, 5);
CHECK(!snappy::RawUncompressToIOVec(
compressed.data(), compressed.size(), iov, ARRAYSIZE(iov)));
for (int i = 0; i < ARRAYSIZE(kLengths); ++i) {
delete[] reinterpret_cast<char *>(iov[i].iov_base);
}
}
static bool CheckUncompressedLength(const string& compressed, static bool CheckUncompressedLength(const string& compressed,
size_t* ulength) { size_t* ulength) {
@ -1106,6 +1261,52 @@ static void BM_UValidate(int iters, int arg) {
} }
BENCHMARK(BM_UValidate)->DenseRange(0, 4); BENCHMARK(BM_UValidate)->DenseRange(0, 4);
static void BM_UIOVec(int iters, int arg) {
StopBenchmarkTiming();
// Pick file to process based on "arg"
CHECK_GE(arg, 0);
CHECK_LT(arg, ARRAYSIZE(files));
string contents = ReadTestDataFile(files[arg].filename,
files[arg].size_limit);
string zcontents;
snappy::Compress(contents.data(), contents.size(), &zcontents);
// Uncompress into an iovec containing ten entries.
const int kNumEntries = 10;
struct iovec iov[kNumEntries];
char *dst = new char[contents.size()];
int used_so_far = 0;
for (int i = 0; i < kNumEntries; ++i) {
iov[i].iov_base = dst + used_so_far;
if (used_so_far == contents.size()) {
iov[i].iov_len = 0;
continue;
}
if (i == kNumEntries - 1) {
iov[i].iov_len = contents.size() - used_so_far;
} else {
iov[i].iov_len = contents.size() / kNumEntries;
}
used_so_far += iov[i].iov_len;
}
SetBenchmarkBytesProcessed(static_cast<int64>(iters) *
static_cast<int64>(contents.size()));
SetBenchmarkLabel(files[arg].label);
StartBenchmarkTiming();
while (iters-- > 0) {
CHECK(snappy::RawUncompressToIOVec(zcontents.data(), zcontents.size(), iov,
kNumEntries));
}
StopBenchmarkTiming();
delete[] dst;
}
BENCHMARK(BM_UIOVec)->DenseRange(0, 4);
static void BM_ZFlat(int iters, int arg) { static void BM_ZFlat(int iters, int arg) {
StopBenchmarkTiming(); StopBenchmarkTiming();