From 0c98fdcf279254a5f556b37a34a6710b4b322729 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 2 Aug 2012 14:09:13 -0700 Subject: [PATCH] Encode id1Type, id2Type and data version the assoc payload. Summary: Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- thrift/assoc.h | 93 ++++++++++++++++++++++++++++++-------- thrift/openhandles.h | 3 +- thrift/test/simpletest.cpp | 7 +-- 3 files changed, 77 insertions(+), 26 deletions(-) diff --git a/thrift/assoc.h b/thrift/assoc.h index b4c75943f6..5e429c990c 100644 --- a/thrift/assoc.h +++ b/thrift/assoc.h @@ -27,6 +27,8 @@ using namespace ::Tleveldb; // // These are the service methods that processes Association Data. +// Native types are stored in big-endian format, i.e. first bytes +// have most significant bits. class AssocServiceHandler : virtual public AssocServiceIf { public: @@ -102,8 +104,10 @@ class AssocServiceHandler : virtual public AssocServiceIf { woptions.sync = true; // create the payload for this assoc - int payloadsize = sizeof(id1Type) + sizeof(id2Type) + - sizeof(dataVersion) + data.size() + wormhole_comment.size(); + int payloadsize = sizeof(id1Type) + sizeof(id2Type) + sizeof(dataVersion) + + sizeof(int32_t) + // store the data size + sizeof(int32_t) + // store the wormhole comment size + data.size() + wormhole_comment.size(); std::string payload; payload.reserve(payloadsize); payload.resize(payloadsize); @@ -270,11 +274,15 @@ class AssocServiceHandler : virtual public AssocServiceIf { int64_t assocType, int64_t id1, const std::vector & id2s) { int64_t ts, id2; - int8_t oldvis; - leveldb::ReadOptions roptions; - leveldb::Status status; - std::string value; - int numout = 0; + + if (id2s.size() > 10000) { + throw generate_exception(tableName, Code::kNotFound, + "Ids2 cannot be gteater than 10K.", + assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); + } + // allocate the entire result buffer. + _return.reserve(id2s.size()); + // create max key to query int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) + @@ -288,9 +296,14 @@ class AssocServiceHandler : virtual public AssocServiceIf { int rowkeysize = makeRowKey(keybuf, id1, assocType); for (unsigned int index = 0; index < id2s.size(); index++) { - int64_t id2 = id2s[index]; + int64_t ts; + int8_t oldvis; + leveldb::ReadOptions roptions; + leveldb::Status status; + std::string value, wormhole; // query column 'm'$id2 + id2 = id2s[index]; int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); leveldb::Slice ckey(keybuf, keysize); status = db->Get(roptions, ckey, &value); @@ -304,12 +317,12 @@ class AssocServiceHandler : virtual public AssocServiceIf { "Unable to find m$id2 ", assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); } + extractTsVisString(&ts, &oldvis, &value[0]); if(oldvis != AssocVisibility::VISIBLE) { continue; } ASSERT_NE(ts, 0); - printf("XXX ts = %ld\n", ts); // this assoc is visible, scan 'p'$ts$id2 to retrieve payload. keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); @@ -318,7 +331,6 @@ class AssocServiceHandler : virtual public AssocServiceIf { // parse results retrieved from database if (status.IsNotFound()) { - printf("XXX2"); continue; // non existant assoc } else if (!status.ok()) { throw generate_exception(tableName, Code::kNotFound, @@ -326,15 +338,16 @@ class AssocServiceHandler : virtual public AssocServiceIf { assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); } - // update return values - _return[numout].id2 = id2; - _return[numout].data = value; + // allocate a new slot in the result set. + _return.resize(_return.size() + 1); + TaoAssocGetResult* result = &_return.back(); - // un-encoded from the payload XXX - _return[numout].id1Type = 0; - _return[numout].id2Type = 0; - _return[numout].dataVersion = 0; - numout++; + // Fill up new element in result set. + result->id2 = id2; + result->time = ts; + extractPayload((char*)value.c_str(), &result->id1Type, + &result->id2Type, + &result->dataVersion, result->data, wormhole); } } @@ -370,20 +383,39 @@ class AssocServiceHandler : virtual public AssocServiceIf { return rowkeysize + sizeof(id2) + 1; } - // // encode id1Type, id2Type, dataversion, etc into the payload - inline void makePayload(char* dest, int64_t id1Type, int64_t id2Type, + void makePayload(char* dest, int64_t id1Type, int64_t id2Type, int64_t dataVersion, const Text& data, const Text& wormhole_comment) { + int32_t datasize = data.size(); + int32_t wormhole_commentsize = wormhole_comment.size(); + dest = copy_int64_switch_endian(dest, id1Type); dest = copy_int64_switch_endian(dest, id2Type); dest = copy_int64_switch_endian(dest, dataVersion); + dest = copy_int32(dest, datasize); + dest = copy_int32(dest, wormhole_commentsize); memcpy(dest, data.data(), data.size()); dest += data.size(); memcpy(dest, wormhole_comment.data(), wormhole_comment.size()); dest += wormhole_comment.size(); } + // extract id1Type, id2Type, dataversion, etc from payload + void extractPayload(char* dest, int64_t* id1Type, int64_t* id2Type, + int64_t* dataVersion, Text& data, + Text& wormhole_comment) { + int32_t datasize, wormsize; + extract_int64(id1Type, dest); dest += sizeof(*id1Type); + extract_int64(id2Type, dest); dest += sizeof(*id2Type); + extract_int64(dataVersion, dest); dest += sizeof(*dataVersion); + extract_int32(&datasize, dest); dest += sizeof(datasize); + extract_int32(&wormsize, dest); dest += sizeof(wormsize); + + data.assign(dest, datasize); dest += datasize; + wormhole_comment.assign(dest, wormsize); dest += wormsize; + } + // fill the timestamp and visibility inline void makeTsVisString(char* dest, int64_t ts, int8_t vis) { dest = copy_int64_switch_endian(dest, ts); @@ -415,10 +447,31 @@ class AssocServiceHandler : virtual public AssocServiceIf { // extracts a int64 type from the char stream. Swaps endianness. inline void extract_int64(int64_t* dest, char* src) { + char* d = (char *)dest; src += sizeof(int64_t) - 1; for (unsigned int i = 0; i < sizeof(uint64_t); i++) { + *d++ = *src--; + } + } + + // + // copy a 4 byte quantity to byte stream. swap endianess. + // + inline char* copy_int32(char* dest, int32_t id) { + char* src = (char *)&id + sizeof(id) - 1; + for (unsigned int i = 0; i < sizeof(id); i++) { *dest++ = *src--; } + return dest; + } + + // extract a 4 byte quantity from a byte stream + inline void extract_int32(int32_t* dest, char* src) { + char* d = (char *)dest; + src += sizeof(int32_t) - 1; + for (unsigned int i = 0; i < sizeof(*dest); i++) { + *d++ = *src--; + } } // extracts a 1 byte integer from the char stream. diff --git a/thrift/openhandles.h b/thrift/openhandles.h index 5052862d3c..a406eca0b5 100644 --- a/thrift/openhandles.h +++ b/thrift/openhandles.h @@ -67,7 +67,8 @@ struct onehandle { unordered_map iterlist; // list of iterators for this database - onehandle() : currentSnapshotId(1), currentIteratorId(1) { + onehandle() : onedb(NULL), + refcount(0), currentSnapshotId(1), currentIteratorId(1) { } // stores a new leveldb snapshot and returns an unique id diff --git a/thrift/test/simpletest.cpp b/thrift/test/simpletest.cpp index 135449e39f..f323be4daa 100644 --- a/thrift/test/simpletest.cpp +++ b/thrift/test/simpletest.cpp @@ -245,15 +245,12 @@ static void testAssocs() { aclient->taoAssocGet(readback, dbname, assocType, id1, id2list); printf("AssocGet suceeded.\n"); - printf("size = %lu\n", readback.size()); - ASSERT_EQ(1, readback.size()); + ASSERT_EQ((unsigned int)1, readback.size()); ASSERT_EQ(id1Type, readback[0].id1Type); - printf("XXX %ld %ld\n", id1Type, readback[0].id1Type); ASSERT_EQ(id2Type, readback[0].id2Type); ASSERT_EQ(ts, readback[0].time); ASSERT_EQ(dataVersion, readback[0].dataVersion); - ASSERT_EQ(readback[0].data.compare(wormhole_comments), 0); - + ASSERT_EQ(readback[0].data.compare(data), 0); } //