rocksdb/utilities/col_buf_decoder.cc

241 lines
6.6 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/col_buf_decoder.h"
#include <cstring>
#include <string>
#include "port/port.h"
namespace rocksdb {
ColBufDecoder::~ColBufDecoder() {}
namespace {
inline uint64_t EncodeFixed64WithEndian(uint64_t val, bool big_endian,
size_t size) {
if (big_endian && port::kLittleEndian) {
val = EndianTransform(val, size);
} else if (!big_endian && !port::kLittleEndian) {
val = EndianTransform(val, size);
}
return val;
}
} // namespace
ColBufDecoder* ColBufDecoder::NewColBufDecoder(
const ColDeclaration& col_declaration) {
if (col_declaration.col_type == "FixedLength") {
return new FixedLengthColBufDecoder(
col_declaration.size, col_declaration.col_compression_type,
col_declaration.nullable, col_declaration.big_endian);
} else if (col_declaration.col_type == "VariableLength") {
return new VariableLengthColBufDecoder();
} else if (col_declaration.col_type == "VariableChunk") {
return new VariableChunkColBufDecoder(col_declaration.col_compression_type);
} else if (col_declaration.col_type == "LongFixedLength") {
return new LongFixedLengthColBufDecoder(col_declaration.size,
col_declaration.nullable);
}
// Unrecognized column type
return nullptr;
}
namespace {
void ReadVarint64(const char** src_ptr, uint64_t* val_ptr) {
const char* q = GetVarint64Ptr(*src_ptr, *src_ptr + 10, val_ptr);
assert(q != nullptr);
*src_ptr = q;
}
} // namespace
size_t FixedLengthColBufDecoder::Init(const char* src) {
remain_runs_ = 0;
last_val_ = 0;
// Dictionary initialization
dict_vec_.clear();
const char* orig_src = src;
if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
const char* q;
uint64_t dict_size;
// Bypass limit
q = GetVarint64Ptr(src, src + 10, &dict_size);
assert(q != nullptr);
src = q;
uint64_t dict_key;
for (uint64_t i = 0; i < dict_size; ++i) {
// Bypass limit
ReadVarint64(&src, &dict_key);
dict_key = EncodeFixed64WithEndian(dict_key, big_endian_, size_);
dict_vec_.push_back(dict_key);
}
}
return src - orig_src;
}
size_t FixedLengthColBufDecoder::Decode(const char* src, char** dest) {
uint64_t read_val = 0;
const char* orig_src = src;
const char* src_limit = src + 20;
if (nullable_) {
bool not_null;
not_null = *src;
src += 1;
if (!not_null) {
return 1;
}
}
if (IsRunLength(col_compression_type_)) {
if (remain_runs_ == 0) {
const char* q;
run_val_ = 0;
if (col_compression_type_ == kColRle) {
memcpy(&run_val_, src, size_);
src += size_;
} else {
q = GetVarint64Ptr(src, src_limit, &run_val_);
assert(q != nullptr);
src = q;
}
q = GetVarint64Ptr(src, src_limit, &remain_runs_);
assert(q != nullptr);
src = q;
if (col_compression_type_ != kColRleDeltaVarint &&
col_compression_type_ != kColRleDict) {
run_val_ = EncodeFixed64WithEndian(run_val_, big_endian_, size_);
}
}
read_val = run_val_;
} else {
if (col_compression_type_ == kColNoCompression) {
memcpy(&read_val, src, size_);
src += size_;
} else {
// Assume a column does not exceed 8 bytes here
const char* q = GetVarint64Ptr(src, src_limit, &read_val);
assert(q != nullptr);
src = q;
}
if (col_compression_type_ != kColDeltaVarint &&
col_compression_type_ != kColDict) {
read_val = EncodeFixed64WithEndian(read_val, big_endian_, size_);
}
}
uint64_t write_val = read_val;
if (col_compression_type_ == kColDeltaVarint ||
col_compression_type_ == kColRleDeltaVarint) {
// does not support 64 bit
uint64_t mask = (write_val & 1) ? (~uint64_t(0)) : 0;
int64_t delta = (write_val >> 1) ^ mask;
write_val = last_val_ + delta;
uint64_t tmp = write_val;
write_val = EncodeFixed64WithEndian(write_val, big_endian_, size_);
last_val_ = tmp;
} else if (col_compression_type_ == kColRleDict ||
col_compression_type_ == kColDict) {
uint64_t dict_val = read_val;
assert(dict_val < dict_vec_.size());
write_val = dict_vec_[dict_val];
}
// dest->append(reinterpret_cast<char*>(&write_val), size_);
memcpy(*dest, reinterpret_cast<char*>(&write_val), size_);
*dest += size_;
if (IsRunLength(col_compression_type_)) {
--remain_runs_;
}
return src - orig_src;
}
size_t LongFixedLengthColBufDecoder::Decode(const char* src, char** dest) {
if (nullable_) {
bool not_null;
not_null = *src;
src += 1;
if (!not_null) {
return 1;
}
}
memcpy(*dest, src, size_);
*dest += size_;
return size_ + 1;
}
size_t VariableLengthColBufDecoder::Decode(const char* src, char** dest) {
uint8_t len;
len = *src;
memcpy(dest, reinterpret_cast<char*>(&len), 1);
*dest += 1;
src += 1;
memcpy(*dest, src, len);
*dest += len;
return len + 1;
}
size_t VariableChunkColBufDecoder::Init(const char* src) {
// Dictionary initialization
dict_vec_.clear();
const char* orig_src = src;
if (col_compression_type_ == kColDict) {
const char* q;
uint64_t dict_size;
// Bypass limit
q = GetVarint64Ptr(src, src + 10, &dict_size);
assert(q != nullptr);
src = q;
uint64_t dict_key;
for (uint64_t i = 0; i < dict_size; ++i) {
// Bypass limit
ReadVarint64(&src, &dict_key);
dict_vec_.push_back(dict_key);
}
}
return src - orig_src;
}
size_t VariableChunkColBufDecoder::Decode(const char* src, char** dest) {
const char* orig_src = src;
uint64_t size = 0;
ReadVarint64(&src, &size);
int64_t full_chunks = size / 8;
uint64_t chunk_buf;
size_t chunk_size = 8;
for (int64_t i = 0; i < full_chunks + 1; ++i) {
chunk_buf = 0;
if (i == full_chunks) {
chunk_size = size % 8;
}
if (col_compression_type_ == kColDict) {
uint64_t dict_val;
ReadVarint64(&src, &dict_val);
assert(dict_val < dict_vec_.size());
chunk_buf = dict_vec_[dict_val];
} else {
memcpy(&chunk_buf, src, chunk_size);
src += chunk_size;
}
memcpy(*dest, reinterpret_cast<char*>(&chunk_buf), 8);
*dest += 8;
uint8_t mask = ((0xFF - 8) + chunk_size) & 0xFF;
memcpy(*dest, reinterpret_cast<char*>(&mask), 1);
*dest += 1;
}
return src - orig_src;
}
} // namespace rocksdb