From 88c515b6ba73c3688f4c278a31134a930454f3ba Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Fri, 3 Aug 2012 00:27:48 -0700 Subject: [PATCH] Implement taoAssocRangeGet(). Summary: Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- thrift/README | 2 +- thrift/assoc.h | 128 ++++++++++++++++++++++++++++++++----- thrift/if/leveldb.thrift | 4 +- thrift/server_utils.cpp | 16 ++--- thrift/test/simpletest.cpp | 52 ++++++++++++++- 5 files changed, 174 insertions(+), 28 deletions(-) diff --git a/thrift/README b/thrift/README index bd3b336a5a..6405a063a9 100644 --- a/thrift/README +++ b/thrift/README @@ -12,7 +12,7 @@ compiled thrift libraries. If you want to compile leveldb with thrift-server support, please set the following enviroment variables appropriately: USE_THRIFT=1 - LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./thrift/libs + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./thrift/libs:./snappy/libs make clean leveldb_server leveldb_server_test You can run the leveldb server unit tests by diff --git a/thrift/assoc.h b/thrift/assoc.h index 5e429c990c..7c600b27a6 100644 --- a/thrift/assoc.h +++ b/thrift/assoc.h @@ -56,7 +56,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { int64_t taoAssocDelete(const Text& tableName, int64_t assocType, int64_t id1, int64_t id2, AssocVisibility visibility, bool update_count, const Text& wormhole_comment) { - printf("taoAssocDelete\n"); + printf("taoAssocDelete not implemented yet\n"); return 0; } @@ -64,7 +64,14 @@ class AssocServiceHandler : virtual public AssocServiceIf { const Text& tableName, int64_t assocType, int64_t id1, int64_t start_time, int64_t end_time, int64_t offset, int64_t limit) { - printf("taoAssocRangeGet\n"); + leveldb::DB* db = openhandles_->get(tableName, NULL); + if (db == NULL) { + throw generate_exception(tableName, Code::kNotFound, + "taoAssocRangeGet: Unable to open database " , + assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); + } + assocRangeGetBytimeInternal(_return, tableName, db, assocType, id1, + start_time, end_time, offset, limit); } void taoAssocGet(std::vector & _return, @@ -73,10 +80,10 @@ class AssocServiceHandler : virtual public AssocServiceIf { leveldb::DB* db = openhandles_->get(tableName, NULL); if (db == NULL) { throw generate_exception(tableName, Code::kNotFound, - "Unable to database " , + "taoAssocGet:Unable to open database " , assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); } - taoAssocGetInternal(_return, tableName, db, assocType, id1, id2s); + assocGetInternal(_return, tableName, db, assocType, id1, id2s); } int64_t taoAssocCount(const Text& tableName, int64_t assocType, int64_t id1) { @@ -84,17 +91,22 @@ class AssocServiceHandler : virtual public AssocServiceIf { if (db == NULL) { return Code::kNotFound; } - return taoAssocCountInternal(tableName, db, assocType, id1); + return assocCountInternal(tableName, db, assocType, id1); } private: OpenHandles* openhandles_; + // the maximum values returned in a rangeget/multiget call. + const static unsigned int MAX_RANGE_SIZE = 10000; + // - // inserts an assoc - // Returns true if the iinsertion is successful, otherwise return false. + // Inserts an assoc + // If update_count, returns the updated count of the assoc. + // If update_count, return zero. + // On failure, return negative number. // - bool assocPutInternal(const Text& tableName, leveldb::DB* db, + int64_t assocPutInternal(const Text& tableName, leveldb::DB* db, int64_t assocType, int64_t id1, int64_t id2, int64_t id1Type, int64_t id2Type, int64_t ts, AssocVisibility vis, @@ -102,6 +114,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { const Text& wormhole_comment) { leveldb::WriteOptions woptions; woptions.sync = true; + ts = convertTime(ts); // change time to numberofmillis till MAXLONG // create the payload for this assoc int payloadsize = sizeof(id1Type) + sizeof(id2Type) + sizeof(dataVersion) + @@ -174,10 +187,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { // if ts != oldts, then delete 'p'$old_ts$id2 if (!newassoc) { - char* val = (char *)value.c_str(); - extract_int64(&oldts, val); - extract_int8(&oldvis, val + sizeof(int64_t)); - + extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); if (ts != oldts) { if (!db->Delete(woptions, pkey).ok()) { throw generate_exception(tableName, Code::kNotFound, @@ -231,7 +241,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { return 0; } - int64_t taoAssocCountInternal(const Text& tableName, leveldb::DB* db, + int64_t assocCountInternal(const Text& tableName, leveldb::DB* db, int64_t assocType, int64_t id1) { // create key to query int maxkeysize = sizeof(id1) + sizeof(assocType) + 1; @@ -268,14 +278,80 @@ class AssocServiceHandler : virtual public AssocServiceIf { return count; } - void taoAssocGetInternal(std::vector & _return, + void assocRangeGetBytimeInternal(std::vector & _return, + const Text& tableName, leveldb::DB* db, + int64_t assocType, int64_t id1, + int64_t start_time, int64_t end_time, int64_t offset, + int64_t limit) { + if (start_time < end_time) { + throw generate_exception(tableName, Code::kNotFound, + "assocRangeGetBytimeInternal:Bad starttime and endtime\n", + assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); + } + + int64_t ts, id2; + const leveldb::ReadOptions options; + std::string wormhole; + + // convert times to time-till-LONGMAX + int64_t startTime = convertTime(start_time); + int64_t endTime = convertTime(end_time); + + // create max key to query + int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) + + sizeof(id2); + std::string dummy; + dummy.reserve(maxkeysize); + dummy.resize(maxkeysize); + + // create rowkey + char* keybuf = &dummy[0]; + int rowkeysize = makeRowKey(keybuf, id1, assocType); + + // Position scan at 'p'$ts$id2 where ts = startTime and id2 = 0 + id2 = 0; + leveldb::Iterator* iter = db->NewIterator(options); + int keysize = appendRowKeyForPayload(rowkeysize, keybuf, startTime, id2); + leveldb::Slice pkey(keybuf, keysize); + + for (iter->Seek(pkey); iter->Valid() && limit > 0 ; iter->Next()) { + // skip over records that the caller is not interested in + if (offset > 0) { + offset--; + continue; + } + printf("XXX server key found %s\n", iter->key().ToString().c_str()); + ASSERT_GE(iter->key().size_, (unsigned int)rowkeysize); + + // extract the timestamp and id1 from the key + extractRowKeyP(&ts, &id2, rowkeysize, (char*)(iter->key().data_)); + ASSERT_GE(ts, startTime); + if (ts > endTime) { + break; + } + + // allocate a new slot in the result set. + _return.resize(_return.size() + 1); + TaoAssocGetResult* result = &_return.back(); + + // Fill up new element in result set. + result->id2 = id2; + result->time = convertTime(ts); + extractPayload((char*)iter->value().data_, &result->id1Type, + &result->id2Type, + &result->dataVersion, result->data, wormhole); + limit--; + } + } + + void assocGetInternal(std::vector & _return, const Text& tableName, leveldb::DB* db, int64_t assocType, int64_t id1, const std::vector & id2s) { int64_t ts, id2; - if (id2s.size() > 10000) { + if (id2s.size() > MAX_RANGE_SIZE) { throw generate_exception(tableName, Code::kNotFound, "Ids2 cannot be gteater than 10K.", assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); @@ -283,7 +359,6 @@ class AssocServiceHandler : virtual public AssocServiceIf { // allocate the entire result buffer. _return.reserve(id2s.size()); - // create max key to query int maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(ts) + sizeof(id2); @@ -344,7 +419,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { // Fill up new element in result set. result->id2 = id2; - result->time = ts; + result->time = convertTime(ts); extractPayload((char*)value.c_str(), &result->id1Type, &result->id2Type, &result->dataVersion, result->data, wormhole); @@ -374,6 +449,17 @@ class AssocServiceHandler : virtual public AssocServiceIf { dest = copy_int64_switch_endian(dest, id2); return rowkeysize + sizeof(ts) + sizeof(id2) + 1; } + + // extract the timestamp and id2 from the key p$ts$id2 + inline void extractRowKeyP(int64_t* ts, int64_t* id, + int rowkeysize, char* src) { + src += rowkeysize; // skip over the rowkey + ASSERT_EQ(*src, 'p'); + src++; + extract_int64(ts, src); src += sizeof(*ts); + extract_int64(id, src); src += sizeof(*id); + } + // fill the row key +'m' + id2 and returns the size of the key inline int appendRowKeyForMeta(int rowkeysize, char* dest, int64_t id2) { @@ -479,6 +565,14 @@ class AssocServiceHandler : virtual public AssocServiceIf { *dest = *(int8_t *)src; } + // convert a timestamp from an ever-increasing number to + // a decreasing number. All stored timestamps in this database + // are MAXLONG - timestamp. Thus, a backward-scan in time + // is converted to a forward scan in the database. + inline int64_t convertTime(int64_t ts) { + return LONG_MAX - ts; + } + // generate an exception message LeveldbException generate_exception(const Text& tableName, diff --git a/thrift/if/leveldb.thrift b/thrift/if/leveldb.thrift index 36446e06ce..c78c04541d 100644 --- a/thrift/if/leveldb.thrift +++ b/thrift/if/leveldb.thrift @@ -309,7 +309,9 @@ service AssocService { * TAO Assoc RangeGet operation. * Obtain assocs in bewteen start_time and end_time in reverse time order. * The range check is inclusive: start_time >= time && time >= end_time. - * And yes, start_time >= end_time. + * And yes, start_time >= end_time because this range scan is a backward + * scan in time, starting with most recent time and scanning backwards + * for the most recent n assocs. */ list taoAssocRangeGet( /** name of table */ diff --git a/thrift/server_utils.cpp b/thrift/server_utils.cpp index f015db68b9..5e9230940b 100644 --- a/thrift/server_utils.cpp +++ b/thrift/server_utils.cpp @@ -280,7 +280,7 @@ class DBHandler : virtual public DBIf { return; } - // If the iterator has reached the endm close it rightaway. + // If the iterator has reached the end close it rightaway. // There is no need for the application to make another thrift // call to cleanup the iterator. if (!it->Valid()) { @@ -302,19 +302,19 @@ class DBHandler : virtual public DBIf { leveldb::Slice key = it->key(); leveldb::Slice value = it->value(); + // pack results back to client + _return.keyvalue.key.data.assign(key.data_, key.size_); + _return.keyvalue.key.size = key.size_; + _return.keyvalue.value.data.assign(value.data_, value.size_); + _return.keyvalue.value.size = value.size_; + _return.status = Code::kOk; // success + // move to next or previous value if (doNext) { it->Next(); } else { it->Prev(); } - - // pack results back to client - _return.keyvalue.key.data = key.data_; - _return.keyvalue.key.size = key.size_; - _return.keyvalue.value.data = value.data_; - _return.keyvalue.value.size = value.size_; - _return.status = Code::kOk; // success } // read the next value from the iterator diff --git a/thrift/test/simpletest.cpp b/thrift/test/simpletest.cpp index f323be4daa..ed82441aca 100644 --- a/thrift/test/simpletest.cpp +++ b/thrift/test/simpletest.cpp @@ -207,6 +207,34 @@ static void testClient() { printf("Iterator scan-selective backward passes.\n"); } +// clean up all data that we inserted as part of this test +void clearDatabase() { + WriteOptions writeOptions; + ReadOptions readOptions; + readOptions.snapshot.snapshotid = 0; + printf("Clearing entire database.\n"); + + ResultIterator ri; + Slice dummy; + dbclient->NewIterator(ri, dbhandle, readOptions, + IteratorType::seekToFirst, dummy); + ASSERT_TRUE(ri.status == Code::kOk); + while (true) { + ResultPair pair; + dbclient->GetNext(pair, dbhandle, ri.iterator); + if (pair.status == Code::kOk) { + Slice key = pair.keyvalue.key; + Code code = dbclient->Delete(dbhandle, key, writeOptions); + ASSERT_EQ(code, Code::kOk); + } else { + break; + } + } + // no need to invoke DeleteIterator because we scanned + // till the end using the iterator and ist is auto-deleted + // by the server. +} + // // Run assoc tests // @@ -231,7 +259,7 @@ static void testAssocs() { ts, vis, update_count, dataVersion, data, wormhole_comments); ASSERT_GE(count, 0); - printf("Put AssocPut suceeded.\n"); + printf("AssocPut first record suceeded.\n"); // verify assoc counts. int64_t cnt = aclient->taoAssocCount(dbname, assocType, id1); @@ -251,6 +279,25 @@ static void testAssocs() { ASSERT_EQ(ts, readback[0].time); ASSERT_EQ(dataVersion, readback[0].dataVersion); ASSERT_EQ(readback[0].data.compare(data), 0); + + // add one more assoc + const Text data1 = "data1......"; + count = aclient->taoAssocPut(dbname, assocType, + id1, id2+2, id1Type+1, id2Type+1, + ts+1, vis, update_count, + dataVersion+1, data1, wormhole_comments); + ASSERT_EQ(count, 2); + printf("AssocPut second record suceeded.\n"); + + // do a range get for id1+type and verify that there + // are two assocs. + readback.clear(); + int64_t offset = 0; + int64_t limit = 1000; + aclient->taoAssocRangeGet(readback, dbname, assocType, + id1, LONG_MAX, 0, + offset, limit); + ASSERT_EQ((unsigned int)2, readback.size()); } // @@ -282,7 +329,10 @@ int main(int argc, char **argv) { // run all tests testClient(); + clearDatabase(); + testAssocs(); + clearDatabase(); // done all tests close();