limit max bytes that can be read/written per pread/write syscall

Summary:
BlockBasedTable sst file size can grow to a large size when universal
compaction is used. When index block exceeds 2G, pread seems to fail and
return truncated data and causes "trucated block" error. I tried to use
```
  #define _FILE_OFFSET_BITS 64
```
But the problem still persists. Splitting a big write/read into smaller
batches seems to solve the problem.

Test Plan:
successfully compacted a case with resulting sst file at ~90G (2.1G
index block size)

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22569
This commit is contained in:
Lei Jin 2014-08-29 21:21:49 -07:00
parent d20b8cfaa1
commit 7e9f28cb23
1 changed files with 35 additions and 13 deletions

View File

@ -239,11 +239,23 @@ class PosixRandomAccessFile: public RandomAccessFile {
char* scratch) const { char* scratch) const {
Status s; Status s;
ssize_t r = -1; ssize_t r = -1;
do { size_t left = n;
r = pread(fd_, scratch, n, static_cast<off_t>(offset)); char* ptr = scratch;
} while (r < 0 && errno == EINTR); while (left > 0) {
IOSTATS_ADD_IF_POSITIVE(bytes_read, r); r = pread(fd_, ptr, left, static_cast<off_t>(offset));
*result = Slice(scratch, (r < 0) ? 0 : r); if (r <= 0) {
if (errno == EINTR) {
continue;
}
break;
}
ptr += r;
offset += r;
left -= r;
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left);
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) { if (r < 0) {
// An error: return a non-ok status // An error: return a non-ok status
s = IOError(filename_, errno); s = IOError(filename_, errno);
@ -907,9 +919,23 @@ class PosixRandomRWFile : public RandomRWFile {
virtual Status Read(uint64_t offset, size_t n, Slice* result, virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const { char* scratch) const {
Status s; Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset)); ssize_t r = -1;
IOSTATS_ADD_IF_POSITIVE(bytes_read, r); size_t left = n;
*result = Slice(scratch, (r < 0) ? 0 : r); char* ptr = scratch;
while (left > 0) {
r = pread(fd_, ptr, left, static_cast<off_t>(offset));
if (r <= 0) {
if (errno == EINTR) {
continue;
}
break;
}
ptr += r;
offset += r;
left -= r;
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left);
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) { if (r < 0) {
s = IOError(filename_, errno); s = IOError(filename_, errno);
} }
@ -1018,15 +1044,12 @@ class PosixFileLock : public FileLock {
std::string filename; std::string filename;
}; };
namespace {
void PthreadCall(const char* label, int result) { void PthreadCall(const char* label, int result) {
if (result != 0) { if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
exit(1); exit(1);
} }
} }
}
class PosixEnv : public Env { class PosixEnv : public Env {
public: public:
@ -1724,12 +1747,11 @@ unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
return thread_pools_[pri].GetQueueLen(); return thread_pools_[pri].GetQueueLen();
} }
namespace {
struct StartThreadState { struct StartThreadState {
void (*user_function)(void*); void (*user_function)(void*);
void* arg; void* arg;
}; };
}
static void* StartThreadWrapper(void* arg) { static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg); state->user_function(state->arg);