Add support for Uncompress(source, sink). Various changes to allow

Uncompress(source, sink) to get the same performance as the different
variants of Uncompress to Cord/DataBuffer/String/FlatBuffer.

Changes to efficiently support Uncompress(source, sink)
--------

a) For strings - we add support to StringByteSink to do GetAppendBuffer so we
   can write to it without copying.
b) For flat array buffers, we do GetAppendBuffer and see if we can get a full buffer.

With the above changes we get performance with ByteSource/ByteSink
that is	very close to directly using flat arrays and strings.

We add various benchmark cases to demonstrate that.

Orthogonal change
------------------

Add support for TryFastAppend() for SnappyScatteredWriter.

Benchmark results are below

CPU: Intel Core2 dL1:32KB dL2:4096KB
Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_UFlat/0               109065     108996       6410 896.0MB/s  html
BM_UFlat/1              1012175    1012343        691 661.4MB/s  urls
BM_UFlat/2                26775      26771      26149 4.4GB/s  jpg
BM_UFlat/3                48947      48940      14363 1.8GB/s  pdf
BM_UFlat/4               441029     440835       1589 886.1MB/s  html4
BM_UFlat/5                39861      39880      17823 588.3MB/s  cp
BM_UFlat/6                18315      18300      38126 581.1MB/s  c
BM_UFlat/7                 5254       5254     100000 675.4MB/s  lsp
BM_UFlat/8              1568060    1567376        447 626.6MB/s  xls
BM_UFlat/9               337512     337734       2073 429.5MB/s  txt1
BM_UFlat/10              287269     287054       2434 415.9MB/s  txt2
BM_UFlat/11              890098     890219        787 457.2MB/s  txt3
BM_UFlat/12             1186593    1186863        590 387.2MB/s  txt4
BM_UFlat/13              573927     573318       1000 853.7MB/s  bin
BM_UFlat/14               64250      64294      10000 567.2MB/s  sum
BM_UFlat/15                7301       7300      96153 552.2MB/s  man
BM_UFlat/16              109617     109636       6375 1031.5MB/s  pb
BM_UFlat/17              364438     364497       1921 482.3MB/s  gaviota
BM_UFlatSink/0           108518     108465       6450 900.4MB/s  html
BM_UFlatSink/1           991952     991997        705 675.0MB/s  urls
BM_UFlatSink/2            26815      26798      26065 4.4GB/s  jpg
BM_UFlatSink/3            49127      49122      14255 1.8GB/s  pdf
BM_UFlatSink/4           436674     436731       1604 894.4MB/s  html4
BM_UFlatSink/5            39738      39733      17345 590.5MB/s  cp
BM_UFlatSink/6            18413      18416      37962 577.4MB/s  c
BM_UFlatSink/7             5677       5676     100000 625.2MB/s  lsp
BM_UFlatSink/8          1552175    1551026        451 633.2MB/s  xls
BM_UFlatSink/9           338526     338489       2065 428.5MB/s  txt1
BM_UFlatSink/10          289387     289307       2420 412.6MB/s  txt2
BM_UFlatSink/11          893803     893706        783 455.4MB/s  txt3
BM_UFlatSink/12         1195919    1195459        586 384.4MB/s  txt4
BM_UFlatSink/13          559637     559779       1000 874.3MB/s  bin
BM_UFlatSink/14           65073      65094      10000 560.2MB/s  sum
BM_UFlatSink/15            7618       7614      92823 529.5MB/s  man
BM_UFlatSink/16          110085     110121       6352 1027.0MB/s  pb
BM_UFlatSink/17          369196     368915       1896 476.5MB/s  gaviota
BM_UValidate/0            46954      46957      14899 2.0GB/s  html
BM_UValidate/1           500621     500868       1000 1.3GB/s  urls
BM_UValidate/2              283        283    2481447 417.2GB/s  jpg
BM_UValidate/3            16230      16228      43137 5.4GB/s  pdf
BM_UValidate/4           189129     189193       3701 2.0GB/s  html4

A=uday
R=sanjay
This commit is contained in:
Steinar H. Gunderson 2015-06-22 16:03:28 +02:00
parent b2ad960067
commit b2312c4c25
5 changed files with 420 additions and 1 deletions

View File

@ -40,6 +40,21 @@ char* Sink::GetAppendBuffer(size_t length, char* scratch) {
return scratch; return scratch;
} }
char* Sink::GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size) {
*allocated_size = scratch_size;
return scratch;
}
void Sink::AppendAndTakeOwnership(
char* bytes, size_t n,
void (*deleter)(void*, const char*, size_t),
void *deleter_arg) {
Append(bytes, n);
(*deleter)(deleter_arg, bytes, n);
}
ByteArraySource::~ByteArraySource() { } ByteArraySource::~ByteArraySource() { }
size_t ByteArraySource::Available() const { return left_; } size_t ByteArraySource::Available() const { return left_; }
@ -68,4 +83,22 @@ char* UncheckedByteArraySink::GetAppendBuffer(size_t len, char* scratch) {
return dest_; return dest_;
} }
void UncheckedByteArraySink::AppendAndTakeOwnership(
char* data, size_t n,
void (*deleter)(void*, const char*, size_t),
void *deleter_arg) {
if (data != dest_) {
memcpy(dest_, data, n);
(*deleter)(deleter_arg, data, n);
}
dest_ += n;
} }
char* UncheckedByteArraySink::GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size) {
*allocated_size = desired_size_hint;
return dest_;
}
} // namespace snappy

View File

@ -59,6 +59,47 @@ class Sink {
// The default implementation always returns the scratch buffer. // The default implementation always returns the scratch buffer.
virtual char* GetAppendBuffer(size_t length, char* scratch); virtual char* GetAppendBuffer(size_t length, char* scratch);
// For higher performance, Sink implementations can provide custom
// AppendAndTakeOwnership() and GetAppendBufferVariable() methods.
// These methods can reduce the number of copies done during
// compression/decompression.
// Append "bytes[0,n-1] to the sink. Takes ownership of "bytes"
// and calls the deleter function as (*deleter)(deleter_arg, bytes, n)
// to free the buffer. deleter function must be non NULL.
//
// The default implementation just calls Append and frees "bytes".
// Other implementations may avoid a copy while appending the buffer.
virtual void AppendAndTakeOwnership(
char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
void *deleter_arg);
// Returns a writable buffer for appending and writes the buffer's capacity to
// *allocated_size. Guarantees *allocated_size >= min_size.
// May return a pointer to the caller-owned scratch buffer which must have
// scratch_size >= min_size.
//
// The returned buffer is only valid until the next operation
// on this ByteSink.
//
// After writing at most *allocated_size bytes, call Append() with the
// pointer returned from this function and the number of bytes written.
// Many Append() implementations will avoid copying bytes if this function
// returned an internal buffer.
//
// If the sink implementation allocates or reallocates an internal buffer,
// it should use the desired_size_hint if appropriate. If a caller cannot
// provide a reasonable guess at the desired capacity, it should set
// desired_size_hint = 0.
//
// If a non-scratch buffer is returned, the caller may only pass
// a prefix to it to Append(). That is, it is not correct to pass an
// interior pointer to Append().
//
// The default implementation always returns the scratch buffer.
virtual char* GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size);
private: private:
// No copying // No copying
@ -121,6 +162,12 @@ class UncheckedByteArraySink : public Sink {
virtual ~UncheckedByteArraySink(); virtual ~UncheckedByteArraySink();
virtual void Append(const char* data, size_t n); virtual void Append(const char* data, size_t n);
virtual char* GetAppendBuffer(size_t len, char* scratch); virtual char* GetAppendBuffer(size_t len, char* scratch);
virtual char* GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size);
virtual void AppendAndTakeOwnership(
char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
void *deleter_arg);
// Return the current output pointer so that a caller can see how // Return the current output pointer so that a caller can see how
// many bytes were produced. // many bytes were produced.

249
snappy.cc
View File

@ -863,6 +863,7 @@ static bool InternalUncompressAllTags(SnappyDecompressor* decompressor,
// Process the entire input // Process the entire input
decompressor->DecompressAllTags(writer); decompressor->DecompressAllTags(writer);
writer->Flush();
return (decompressor->eof() && writer->CheckLength()); return (decompressor->eof() && writer->CheckLength());
} }
@ -1115,6 +1116,7 @@ class SnappyIOVecWriter {
return true; return true;
} }
inline void Flush() {}
}; };
bool RawUncompressToIOVec(const char* compressed, size_t compressed_length, bool RawUncompressToIOVec(const char* compressed, size_t compressed_length,
@ -1215,6 +1217,10 @@ class SnappyArrayWriter {
op_ = op + len; op_ = op + len;
return true; return true;
} }
inline size_t Produced() const {
return op_ - base_;
}
inline void Flush() {}
}; };
bool RawUncompress(const char* compressed, size_t n, char* uncompressed) { bool RawUncompress(const char* compressed, size_t n, char* uncompressed) {
@ -1269,6 +1275,7 @@ class SnappyDecompressionValidator {
produced_ += len; produced_ += len;
return produced_ <= expected_; return produced_ <= expected_;
} }
inline void Flush() {}
}; };
bool IsValidCompressedBuffer(const char* compressed, size_t n) { bool IsValidCompressedBuffer(const char* compressed, size_t n) {
@ -1277,6 +1284,11 @@ bool IsValidCompressedBuffer(const char* compressed, size_t n) {
return InternalUncompress(&reader, &writer); return InternalUncompress(&reader, &writer);
} }
bool IsValidCompressed(Source* compressed) {
SnappyDecompressionValidator writer;
return InternalUncompress(compressed, &writer);
}
void RawCompress(const char* input, void RawCompress(const char* input,
size_t input_length, size_t input_length,
char* compressed, char* compressed,
@ -1300,6 +1312,241 @@ size_t Compress(const char* input, size_t input_length, string* compressed) {
return compressed_length; return compressed_length;
} }
// -----------------------------------------------------------------------
// Sink interface
// -----------------------------------------------------------------------
// A type that decompresses into a Sink. The template parameter
// Allocator must export one method "char* Allocate(int size);", which
// allocates a buffer of "size" and appends that to the destination.
template <typename Allocator>
class SnappyScatteredWriter {
Allocator allocator_;
// We need random access into the data generated so far. Therefore
// we keep track of all of the generated data as an array of blocks.
// All of the blocks except the last have length kBlockSize.
vector<char*> blocks_;
size_t expected_;
// Total size of all fully generated blocks so far
size_t full_size_;
// Pointer into current output block
char* op_base_; // Base of output block
char* op_ptr_; // Pointer to next unfilled byte in block
char* op_limit_; // Pointer just past block
inline size_t Size() const {
return full_size_ + (op_ptr_ - op_base_);
}
bool SlowAppend(const char* ip, size_t len);
bool SlowAppendFromSelf(size_t offset, size_t len);
public:
inline explicit SnappyScatteredWriter(const Allocator& allocator)
: allocator_(allocator),
full_size_(0),
op_base_(NULL),
op_ptr_(NULL),
op_limit_(NULL) {
}
inline void SetExpectedLength(size_t len) {
assert(blocks_.empty());
expected_ = len;
}
inline bool CheckLength() const {
return Size() == expected_;
}
// Return the number of bytes actually uncompressed so far
inline size_t Produced() const {
return Size();
}
inline bool Append(const char* ip, size_t len) {
size_t avail = op_limit_ - op_ptr_;
if (len <= avail) {
// Fast path
memcpy(op_ptr_, ip, len);
op_ptr_ += len;
return true;
} else {
return SlowAppend(ip, len);
}
}
inline bool TryFastAppend(const char* ip, size_t available, size_t length) {
char* op = op_ptr_;
const int space_left = op_limit_ - op;
if (length <= 16 && available >= 16 + kMaximumTagLength &&
space_left >= 16) {
// Fast path, used for the majority (about 95%) of invocations.
UNALIGNED_STORE64(op, UNALIGNED_LOAD64(ip));
UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(ip + 8));
op_ptr_ = op + length;
return true;
} else {
return false;
}
}
inline bool AppendFromSelf(size_t offset, size_t len) {
// See SnappyArrayWriter::AppendFromSelf for an explanation of
// the "offset - 1u" trick.
if (offset - 1u < op_ptr_ - op_base_) {
const size_t space_left = op_limit_ - op_ptr_;
if (space_left >= len + kMaxIncrementCopyOverflow) {
// Fast path: src and dst in current block.
IncrementalCopyFastPath(op_ptr_ - offset, op_ptr_, len);
op_ptr_ += len;
return true;
}
}
return SlowAppendFromSelf(offset, len);
}
// Called at the end of the decompress. We ask the allocator
// write all blocks to the sink.
inline void Flush() { allocator_.Flush(Produced()); }
};
template<typename Allocator>
bool SnappyScatteredWriter<Allocator>::SlowAppend(const char* ip, size_t len) {
size_t avail = op_limit_ - op_ptr_;
while (len > avail) {
// Completely fill this block
memcpy(op_ptr_, ip, avail);
op_ptr_ += avail;
assert(op_limit_ - op_ptr_ == 0);
full_size_ += (op_ptr_ - op_base_);
len -= avail;
ip += avail;
// Bounds check
if (full_size_ + len > expected_) {
return false;
}
// Make new block
size_t bsize = min<size_t>(kBlockSize, expected_ - full_size_);
op_base_ = allocator_.Allocate(bsize);
op_ptr_ = op_base_;
op_limit_ = op_base_ + bsize;
blocks_.push_back(op_base_);
avail = bsize;
}
memcpy(op_ptr_, ip, len);
op_ptr_ += len;
return true;
}
template<typename Allocator>
bool SnappyScatteredWriter<Allocator>::SlowAppendFromSelf(size_t offset,
size_t len) {
// Overflow check
// See SnappyArrayWriter::AppendFromSelf for an explanation of
// the "offset - 1u" trick.
const size_t cur = Size();
if (offset - 1u >= cur) return false;
if (expected_ - cur < len) return false;
// Currently we shouldn't ever hit this path because Compress() chops the
// input into blocks and does not create cross-block copies. However, it is
// nice if we do not rely on that, since we can get better compression if we
// allow cross-block copies and thus might want to change the compressor in
// the future.
size_t src = cur - offset;
while (len-- > 0) {
char c = blocks_[src >> kBlockLog][src & (kBlockSize-1)];
Append(&c, 1);
src++;
}
return true;
}
class SnappySinkAllocator {
public:
explicit SnappySinkAllocator(Sink* dest): dest_(dest) {}
~SnappySinkAllocator() {}
char* Allocate(int size) {
Datablock block(new char[size], size);
blocks_.push_back(block);
return block.data;
}
// We flush only at the end, because the writer wants
// random access to the blocks and once we hand the
// block over to the sink, we can't access it anymore.
// Also we don't write more than has been actually written
// to the blocks.
void Flush(size_t size) {
size_t size_written = 0;
size_t block_size;
for (int i = 0; i < blocks_.size(); ++i) {
block_size = min<size_t>(blocks_[i].size, size - size_written);
dest_->AppendAndTakeOwnership(blocks_[i].data, block_size,
&SnappySinkAllocator::Deleter, NULL);
size_written += block_size;
}
blocks_.clear();
}
private:
struct Datablock {
char* data;
size_t size;
Datablock(char* p, size_t s) : data(p), size(s) {}
};
static void Deleter(void* arg, const char* bytes, size_t size) {
delete[] bytes;
}
Sink* dest_;
vector<Datablock> blocks_;
// Note: copying this object is allowed
};
size_t UncompressAsMuchAsPossible(Source* compressed, Sink* uncompressed) {
SnappySinkAllocator allocator(uncompressed);
SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
InternalUncompress(compressed, &writer);
return writer.Produced();
}
bool Uncompress(Source* compressed, Sink* uncompressed) {
// Read the uncompressed length from the front of the compressed input
SnappyDecompressor decompressor(compressed);
uint32 uncompressed_len = 0;
if (!decompressor.ReadUncompressedLength(&uncompressed_len)) {
return false;
}
char c;
size_t allocated_size;
char* buf = uncompressed->GetAppendBufferVariable(
1, uncompressed_len, &c, 1, &allocated_size);
// If we can get a flat buffer, then use it, otherwise do block by block
// uncompression
if (allocated_size >= uncompressed_len) {
SnappyArrayWriter writer(buf);
bool result = InternalUncompressAllTags(
&decompressor, &writer, uncompressed_len);
uncompressed->Append(buf, writer.Produced());
return result;
} else {
SnappySinkAllocator allocator(uncompressed);
SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
return InternalUncompressAllTags(&decompressor, &writer, uncompressed_len);
}
}
} // end namespace snappy } // end namespace snappy

View File

@ -84,6 +84,18 @@ namespace snappy {
bool Uncompress(const char* compressed, size_t compressed_length, bool Uncompress(const char* compressed, size_t compressed_length,
string* uncompressed); string* uncompressed);
// Decompresses "compressed" to "*uncompressed".
//
// returns false if the message is corrupted and could not be decompressed
bool Uncompress(Source* compressed, Sink* uncompressed);
// This routine uncompresses as much of the "compressed" as possible
// into sink. It returns the number of valid bytes added to sink
// (extra invalid bytes may have been added due to errors; the caller
// should ignore those). The emitted data typically has length
// GetUncompressedLength(), but may be shorter if an error is
// encountered.
size_t UncompressAsMuchAsPossible(Source* compressed, Sink* uncompressed);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Lower-level character array based routines. May be useful for // Lower-level character array based routines. May be useful for
@ -164,6 +176,14 @@ namespace snappy {
bool IsValidCompressedBuffer(const char* compressed, bool IsValidCompressedBuffer(const char* compressed,
size_t compressed_length); size_t compressed_length);
// Returns true iff the contents of "compressed" can be uncompressed
// successfully. Does not return the uncompressed data. Takes
// time proportional to *compressed length, but is usually at least
// a factor of four faster than actual decompression.
// On success, consumes all of *compressed. On failure, consumes an
// unspecified prefix of *compressed.
bool IsValidCompressed(Source* compressed);
// The size of a compression block. Note that many parts of the compression // The size of a compression block. Note that many parts of the compression
// code assumes that kBlockSize <= 65536; in particular, the hash table // code assumes that kBlockSize <= 65536; in particular, the hash table
// can only store 16-bit offsets, and EmitCopy() also assumes the offset // can only store 16-bit offsets, and EmitCopy() also assumes the offset

View File

@ -488,6 +488,23 @@ static int VerifyString(const string& input) {
return uncompressed.size(); return uncompressed.size();
} }
static void VerifyStringSink(const string& input) {
string compressed;
DataEndingAtUnreadablePage i(input);
const size_t written = snappy::Compress(i.data(), i.size(), &compressed);
CHECK_EQ(written, compressed.size());
CHECK_LE(compressed.size(),
snappy::MaxCompressedLength(input.size()));
CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size()));
string uncompressed;
uncompressed.resize(input.size());
snappy::UncheckedByteArraySink sink(string_as_array(&uncompressed));
DataEndingAtUnreadablePage c(compressed);
snappy::ByteArraySource source(c.data(), c.size());
CHECK(snappy::Uncompress(&source, &sink));
CHECK_EQ(uncompressed, input);
}
static void VerifyIOVec(const string& input) { static void VerifyIOVec(const string& input) {
string compressed; string compressed;
@ -559,6 +576,28 @@ static void VerifyNonBlockedCompression(const string& input) {
CHECK(snappy::Uncompress(compressed.data(), compressed.size(), &uncomp_str)); CHECK(snappy::Uncompress(compressed.data(), compressed.size(), &uncomp_str));
CHECK_EQ(uncomp_str, input); CHECK_EQ(uncomp_str, input);
// Uncompress using source/sink
string uncomp_str2;
uncomp_str2.resize(input.size());
snappy::UncheckedByteArraySink sink(string_as_array(&uncomp_str2));
snappy::ByteArraySource source(compressed.data(), compressed.size());
CHECK(snappy::Uncompress(&source, &sink));
CHECK_EQ(uncomp_str2, input);
// Uncompress into iovec
{
static const int kNumBlocks = 10;
struct iovec vec[kNumBlocks];
const int block_size = 1 + input.size() / kNumBlocks;
string iovec_data(block_size * kNumBlocks, 'x');
for (int i = 0; i < kNumBlocks; i++) {
vec[i].iov_base = string_as_array(&iovec_data) + i * block_size;
vec[i].iov_len = block_size;
}
CHECK(snappy::RawUncompressToIOVec(compressed.data(), compressed.size(),
vec, kNumBlocks));
CHECK_EQ(string(iovec_data.data(), input.size()), input);
}
} }
// Expand the input so that it is at least K times as big as block size // Expand the input so that it is at least K times as big as block size
@ -577,6 +616,8 @@ static int Verify(const string& input) {
// Compress using string based routines // Compress using string based routines
const int result = VerifyString(input); const int result = VerifyString(input);
// Verify using sink based routines
VerifyStringSink(input);
VerifyNonBlockedCompression(input); VerifyNonBlockedCompression(input);
VerifyIOVec(input); VerifyIOVec(input);
@ -1291,6 +1332,37 @@ static void BM_UIOVec(int iters, int arg) {
} }
BENCHMARK(BM_UIOVec)->DenseRange(0, 4); BENCHMARK(BM_UIOVec)->DenseRange(0, 4);
static void BM_UFlatSink(int iters, int arg) {
StopBenchmarkTiming();
// Pick file to process based on "arg"
CHECK_GE(arg, 0);
CHECK_LT(arg, ARRAYSIZE(files));
string contents = ReadTestDataFile(files[arg].filename,
files[arg].size_limit);
string zcontents;
snappy::Compress(contents.data(), contents.size(), &zcontents);
char* dst = new char[contents.size()];
SetBenchmarkBytesProcessed(static_cast<int64>(iters) *
static_cast<int64>(contents.size()));
SetBenchmarkLabel(files[arg].label);
StartBenchmarkTiming();
while (iters-- > 0) {
snappy::ByteArraySource source(zcontents.data(), zcontents.size());
snappy::UncheckedByteArraySink sink(dst);
CHECK(snappy::Uncompress(&source, &sink));
}
StopBenchmarkTiming();
string s(dst, contents.size());
CHECK_EQ(contents, s);
delete[] dst;
}
BENCHMARK(BM_UFlatSink)->DenseRange(0, ARRAYSIZE(files) - 1);
static void BM_ZFlat(int iters, int arg) { static void BM_ZFlat(int iters, int arg) {
StopBenchmarkTiming(); StopBenchmarkTiming();