Encode id1Type, id2Type and data version the assoc payload.

Summary:

Test Plan:

Reviewers:

CC:

Task ID: #

Blame Rev:
This commit is contained in:
Dhruba Borthakur 2012-08-02 14:09:13 -07:00
parent a3a8f83482
commit 0c98fdcf27
3 changed files with 77 additions and 26 deletions

View File

@ -27,6 +27,8 @@ using namespace ::Tleveldb;
//
// These are the service methods that processes Association Data.
// Native types are stored in big-endian format, i.e. first bytes
// have most significant bits.
class AssocServiceHandler : virtual public AssocServiceIf {
public:
@ -102,8 +104,10 @@ class AssocServiceHandler : virtual public AssocServiceIf {
woptions.sync = true;
// create the payload for this assoc
int payloadsize = sizeof(id1Type) + sizeof(id2Type) +
sizeof(dataVersion) + data.size() + wormhole_comment.size();
int payloadsize = sizeof(id1Type) + sizeof(id2Type) + sizeof(dataVersion) +
sizeof(int32_t) + // store the data size
sizeof(int32_t) + // store the wormhole comment size
data.size() + wormhole_comment.size();
std::string payload;
payload.reserve(payloadsize);
payload.resize(payloadsize);
@ -270,11 +274,15 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int64_t assocType, int64_t id1,
const std::vector<int64_t> & id2s) {
int64_t ts, id2;
int8_t oldvis;
leveldb::ReadOptions roptions;
leveldb::Status status;
std::string value;
int numout = 0;
if (id2s.size() > 10000) {
throw generate_exception(tableName, Code::kNotFound,
"Ids2 cannot be gteater than 10K.",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
}
// allocate the entire result buffer.
_return.reserve(id2s.size());
// create max key to query
int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) +
@ -288,9 +296,14 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int rowkeysize = makeRowKey(keybuf, id1, assocType);
for (unsigned int index = 0; index < id2s.size(); index++) {
int64_t id2 = id2s[index];
int64_t ts;
int8_t oldvis;
leveldb::ReadOptions roptions;
leveldb::Status status;
std::string value, wormhole;
// query column 'm'$id2
id2 = id2s[index];
int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2);
leveldb::Slice ckey(keybuf, keysize);
status = db->Get(roptions, ckey, &value);
@ -304,12 +317,12 @@ class AssocServiceHandler : virtual public AssocServiceIf {
"Unable to find m$id2 ",
assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1);
}
extractTsVisString(&ts, &oldvis, &value[0]);
if(oldvis != AssocVisibility::VISIBLE) {
continue;
}
ASSERT_NE(ts, 0);
printf("XXX ts = %ld\n", ts);
// this assoc is visible, scan 'p'$ts$id2 to retrieve payload.
keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2);
@ -318,7 +331,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// parse results retrieved from database
if (status.IsNotFound()) {
printf("XXX2");
continue; // non existant assoc
} else if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound,
@ -326,15 +338,16 @@ class AssocServiceHandler : virtual public AssocServiceIf {
assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1);
}
// update return values
_return[numout].id2 = id2;
_return[numout].data = value;
// allocate a new slot in the result set.
_return.resize(_return.size() + 1);
TaoAssocGetResult* result = &_return.back();
// un-encoded from the payload XXX
_return[numout].id1Type = 0;
_return[numout].id2Type = 0;
_return[numout].dataVersion = 0;
numout++;
// Fill up new element in result set.
result->id2 = id2;
result->time = ts;
extractPayload((char*)value.c_str(), &result->id1Type,
&result->id2Type,
&result->dataVersion, result->data, wormhole);
}
}
@ -370,20 +383,39 @@ class AssocServiceHandler : virtual public AssocServiceIf {
return rowkeysize + sizeof(id2) + 1;
}
//
// encode id1Type, id2Type, dataversion, etc into the payload
inline void makePayload(char* dest, int64_t id1Type, int64_t id2Type,
void makePayload(char* dest, int64_t id1Type, int64_t id2Type,
int64_t dataVersion, const Text& data,
const Text& wormhole_comment) {
int32_t datasize = data.size();
int32_t wormhole_commentsize = wormhole_comment.size();
dest = copy_int64_switch_endian(dest, id1Type);
dest = copy_int64_switch_endian(dest, id2Type);
dest = copy_int64_switch_endian(dest, dataVersion);
dest = copy_int32(dest, datasize);
dest = copy_int32(dest, wormhole_commentsize);
memcpy(dest, data.data(), data.size());
dest += data.size();
memcpy(dest, wormhole_comment.data(), wormhole_comment.size());
dest += wormhole_comment.size();
}
// extract id1Type, id2Type, dataversion, etc from payload
void extractPayload(char* dest, int64_t* id1Type, int64_t* id2Type,
int64_t* dataVersion, Text& data,
Text& wormhole_comment) {
int32_t datasize, wormsize;
extract_int64(id1Type, dest); dest += sizeof(*id1Type);
extract_int64(id2Type, dest); dest += sizeof(*id2Type);
extract_int64(dataVersion, dest); dest += sizeof(*dataVersion);
extract_int32(&datasize, dest); dest += sizeof(datasize);
extract_int32(&wormsize, dest); dest += sizeof(wormsize);
data.assign(dest, datasize); dest += datasize;
wormhole_comment.assign(dest, wormsize); dest += wormsize;
}
// fill the timestamp and visibility
inline void makeTsVisString(char* dest, int64_t ts, int8_t vis) {
dest = copy_int64_switch_endian(dest, ts);
@ -415,10 +447,31 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// extracts a int64 type from the char stream. Swaps endianness.
inline void extract_int64(int64_t* dest, char* src) {
char* d = (char *)dest;
src += sizeof(int64_t) - 1;
for (unsigned int i = 0; i < sizeof(uint64_t); i++) {
*d++ = *src--;
}
}
//
// copy a 4 byte quantity to byte stream. swap endianess.
//
inline char* copy_int32(char* dest, int32_t id) {
char* src = (char *)&id + sizeof(id) - 1;
for (unsigned int i = 0; i < sizeof(id); i++) {
*dest++ = *src--;
}
return dest;
}
// extract a 4 byte quantity from a byte stream
inline void extract_int32(int32_t* dest, char* src) {
char* d = (char *)dest;
src += sizeof(int32_t) - 1;
for (unsigned int i = 0; i < sizeof(*dest); i++) {
*d++ = *src--;
}
}
// extracts a 1 byte integer from the char stream.

View File

@ -67,7 +67,8 @@ struct onehandle {
unordered_map<uint64_t, struct iteratorEntry*> iterlist;
// list of iterators for this database
onehandle() : currentSnapshotId(1), currentIteratorId(1) {
onehandle() : onedb(NULL),
refcount(0), currentSnapshotId(1), currentIteratorId(1) {
}
// stores a new leveldb snapshot and returns an unique id

View File

@ -245,15 +245,12 @@ static void testAssocs() {
aclient->taoAssocGet(readback, dbname,
assocType, id1, id2list);
printf("AssocGet suceeded.\n");
printf("size = %lu\n", readback.size());
ASSERT_EQ(1, readback.size());
ASSERT_EQ((unsigned int)1, readback.size());
ASSERT_EQ(id1Type, readback[0].id1Type);
printf("XXX %ld %ld\n", id1Type, readback[0].id1Type);
ASSERT_EQ(id2Type, readback[0].id2Type);
ASSERT_EQ(ts, readback[0].time);
ASSERT_EQ(dataVersion, readback[0].dataVersion);
ASSERT_EQ(readback[0].data.compare(wormhole_comments), 0);
ASSERT_EQ(readback[0].data.compare(data), 0);
}
//