add bzip2 compression

Summary: add bzip2 compression

Test Plan: testcases in table_test

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D3909
This commit is contained in:
heyongqiang 2012-06-28 19:26:43 -07:00
parent 054a5657f8
commit daa816c4a0
7 changed files with 181 additions and 8 deletions

View File

@ -149,6 +149,16 @@ EOF
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lz"
fi
# Test whether bzip library is installed
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <bzlib.h>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DBZIP2"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2"
fi
# Test whether tcmalloc is available
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
int main() {}

View File

@ -26,7 +26,8 @@ enum CompressionType {
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression =0x2
kZlibCompression = 0x2,
kBZip2Compression = 0x3
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -143,14 +143,23 @@ inline bool Snappy_Uncompress(
}
inline bool Zlib_Compress(const char* input, size_t length,
::std::string* output, int level = -1, int strategy = 0) {
::std::string* output, int windowBits = 15, int level = -1,
int strategy = 0) {
return false;
}
inline bool Zlib_Uncompress(
const char* input_data,
size_t input_length,
char* output) {
inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
int* decompress_size, int windowBits = 15) {
return false;
}
inline bool BZip2_Compress(const char* input, size_t length,
::std::string* output) {
return false;
}
inline char* BZip2_Uncompress( const char* input_data, size_t input_length,
int* decompress_size) {
return false;
}

View File

@ -27,9 +27,15 @@
#ifdef SNAPPY
#include <snappy.h>
#endif
#ifdef ZLIB
#include <zlib.h>
#endif
#ifdef BZIP2
#include <bzlib.h>
#endif
#include <stdint.h>
#include <string>
#include <string.h>
@ -245,6 +251,120 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
return false;
}
inline bool BZip2_Compress(const char* input, size_t length,
::std::string* output) {
#ifdef BZIP2
bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream));
// Block size 1 is 100K.
// 0 is for silent.
// 30 is the default workFactor
int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
if (st != BZ_OK) {
return false;
}
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(length);
// Compress the input, and put compressed data in output.
_stream.next_in = (char *)input;
_stream.avail_in = length;
// Initialize the output size.
_stream.next_out = (char *)&(*output)[0];
_stream.avail_out = length;
int old_sz =0, new_sz =0;
while(_stream.next_in != NULL && _stream.avail_in != 0) {
int st = BZ2_bzCompress(&_stream, BZ_FINISH);
switch (st) {
case BZ_STREAM_END:
break;
case BZ_FINISH_OK:
// No output space. Increase the output space by 20%.
// (Should we fail the compression since it expands the size?)
old_sz = output->size();
new_sz = output->size() * 1.2;
output->resize(new_sz);
// Set more output.
_stream.next_out = (char *)&(*output)[old_sz];
_stream.avail_out = new_sz - old_sz;
break;
case Z_BUF_ERROR:
default:
BZ2_bzCompressEnd(&_stream);
return false;
}
}
output->resize(output->size() - _stream.avail_out);
BZ2_bzCompressEnd(&_stream);
return true;
return output;
#endif
return false;
}
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef BZIP2
bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream));
int st = BZ2_bzDecompressInit(&_stream, 0, 0);
if (st != BZ_OK) {
return NULL;
}
_stream.next_in = (char *)input_data;
_stream.avail_in = input_length;
// Assume the decompressed data size will be 5x of compressed size.
int output_len = input_length * 5;
char* output = new char[output_len];
int old_sz = output_len;
_stream.next_out = (char *)output;
_stream.avail_out = output_len;
char* tmp = NULL;
while(_stream.next_in != NULL && _stream.avail_in != 0) {
int st = BZ2_bzDecompress(&_stream);
switch (st) {
case BZ_STREAM_END:
break;
case Z_OK:
// No output space. Increase the output space by 20%.
old_sz = output_len;
output_len = output_len * 1.2;
tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
// Set more output.
_stream.next_out = (char *)(output + old_sz);
_stream.avail_out = output_len - old_sz;
break;
case Z_BUF_ERROR:
default:
delete[] output;
BZ2_bzDecompressEnd(&_stream);
return NULL;
}
}
*decompress_size = output_len - _stream.avail_out;
BZ2_bzDecompressEnd(&_stream);
return output;
#endif
return false;
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
return false;
}

View File

@ -99,6 +99,7 @@ Status ReadBlock(RandomAccessFile* file,
}
char* ubuf = NULL;
int decompress_size = 0;
switch (data[n]) {
case kNoCompression:
if (data != buf) {
@ -136,7 +137,6 @@ Status ReadBlock(RandomAccessFile* file,
break;
}
case kZlibCompression:
int decompress_size;
ubuf = port::Zlib_Uncompress(data, n, &decompress_size);
if (!ubuf) {
delete[] buf;
@ -147,6 +147,17 @@ Status ReadBlock(RandomAccessFile* file,
result->heap_allocated = true;
result->cachable = true;
break;
case kBZip2Compression:
ubuf = port::BZip2_Uncompress(data, n, &decompress_size);
if (!ubuf) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
break;
default:
delete[] buf;
return Status::Corruption("bad block type");

View File

@ -151,6 +151,7 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice raw = block->Finish();
Slice block_contents;
std::string* compressed = &r->compressed_output;
CompressionType type = r->options.compression;
switch (type) {
case kNoCompression:
@ -171,7 +172,6 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
break;
}
case kZlibCompression:
std::string* compressed = &r->compressed_output;
if (port::Zlib_Compress(raw.data(), raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
@ -182,6 +182,17 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
type = kNoCompression;
}
break;
case kBZip2Compression:
if (port::BZip2_Compress(raw.data(), raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// BZip not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();

View File

@ -408,6 +408,12 @@ static bool ZlibCompressionSupported() {
return port::Zlib_Compress(in.data(), in.size(), &out);
}
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(in.data(), in.size(), &out);
}
enum TestType {
TABLE_TEST,
BLOCK_TEST,
@ -446,6 +452,11 @@ static std::vector<TestArgs> Generate_Arg_List()
compression_types.push_back(kZlibCompression);
#endif
#ifdef BZIP2
if (BZip2CompressionSupported())
compression_types.push_back(kBZip2Compression);
#endif
for(int i =0; i < test_type_len; i++)
for (int j =0; j < reverse_compare_len; j++)
for (int k =0; k < restart_interval_len; k++)