diff --git a/snappy-internal.h b/snappy-internal.h index 1e1c307..80b0ab1 100644 --- a/snappy-internal.h +++ b/snappy-internal.h @@ -89,12 +89,18 @@ char* CompressFragment(const char* input, // Does not read *(s1 + (s2_limit - s2)) or beyond. // Requires that s2_limit >= s2. // +// In addition populate *data with the next 8 bytes from the end of the match. +// This is only done if 8 bytes are available (s2_limit - s2 >= 8). The point is +// that on some arch's this can be done faster in this routine than subsequent +// loading from s2 + n. +// // Separate implementation for 64-bit, little-endian cpus. #if !defined(SNAPPY_IS_BIG_ENDIAN) && \ (defined(ARCH_K8) || defined(ARCH_PPC) || defined(ARCH_ARM)) static inline std::pair FindMatchLength(const char* s1, const char* s2, - const char* s2_limit) { + const char* s2_limit, + uint64* data) { assert(s2_limit >= s2); size_t matched = 0; @@ -103,12 +109,28 @@ static inline std::pair FindMatchLength(const char* s1, // uncommon code paths that determine, without extra effort, whether the match // length is less than 8. In short, we are hoping to avoid a conditional // branch, and perhaps get better code layout from the C++ compiler. - if (SNAPPY_PREDICT_TRUE(s2 <= s2_limit - 8)) { + if (SNAPPY_PREDICT_TRUE(s2 <= s2_limit - 16)) { uint64 a1 = UNALIGNED_LOAD64(s1); uint64 a2 = UNALIGNED_LOAD64(s2); - if (a1 != a2) { - return std::pair(Bits::FindLSBSetNonZero64(a1 ^ a2) >> 3, - true); + if (SNAPPY_PREDICT_TRUE(a1 != a2)) { + uint64 xorval = a1 ^ a2; + int shift = Bits::FindLSBSetNonZero64(xorval); + size_t matched_bytes = shift >> 3; +#ifndef __x86_64__ + *data = UNALIGNED_LOAD64(s2 + matched_bytes); +#else + // Unfortunately the compiler cannot find this using the obvious c++ code + // *data = shift == 0 ? a2 : (a2 >> shift) | (a3 << (64 - shift); + // the reason is that the above needs the conditional clause to guard + // against UB when shift == 0. The compiler doesn't realize the full + // expression can be lowered into a single "shrd" instruction and in + // effect the conditional can be ignored. + uint64 a3 = UNALIGNED_LOAD64(s2 + 8); + asm ("shrdq %%cl, %1, %0\n\t" : "+r"(a2) : "r"(a3), "c"(shift & -8)); + *data = a2; +#endif + assert(*data == UNALIGNED_LOAD64(s2 + matched_bytes)); + return std::pair(matched_bytes, true); } else { matched = 8; s2 += 8; @@ -119,14 +141,25 @@ static inline std::pair FindMatchLength(const char* s1, // time until we find a 64-bit block that doesn't match; then we find // the first non-matching bit and use that to calculate the total // length of the match. - while (SNAPPY_PREDICT_TRUE(s2 <= s2_limit - 8)) { - if (UNALIGNED_LOAD64(s2) == UNALIGNED_LOAD64(s1 + matched)) { + while (SNAPPY_PREDICT_TRUE(s2 <= s2_limit - 16)) { + uint64 a1 = UNALIGNED_LOAD64(s1 + matched); + uint64 a2 = UNALIGNED_LOAD64(s2); + if (a1 == a2) { s2 += 8; matched += 8; } else { - uint64 x = UNALIGNED_LOAD64(s2) ^ UNALIGNED_LOAD64(s1 + matched); - int matching_bits = Bits::FindLSBSetNonZero64(x); - matched += matching_bits >> 3; + uint64 xorval = a1 ^ a2; + int shift = Bits::FindLSBSetNonZero64(xorval); + size_t matched_bytes = shift >> 3; +#ifndef __x86_64__ + *data = UNALIGNED_LOAD64(s2 + matched_bytes); +#else + uint64 a3 = UNALIGNED_LOAD64(s2 + 8); + asm("shrdq %%cl, %1, %0\n\t" : "+r"(a2) : "r"(a3), "c"(shift & -8)); + *data = a2; +#endif + assert(*data == UNALIGNED_LOAD64(s2 + matched_bytes)); + matched += matched_bytes; assert(matched >= 8); return std::pair(matched, false); } @@ -136,6 +169,9 @@ static inline std::pair FindMatchLength(const char* s1, ++s2; ++matched; } else { + if (s2 <= s2_limit - 8) { + *data = UNALIGNED_LOAD64(s2); + } return std::pair(matched, matched < 8); } } @@ -144,7 +180,8 @@ static inline std::pair FindMatchLength(const char* s1, #else static inline std::pair FindMatchLength(const char* s1, const char* s2, - const char* s2_limit) { + const char* s2_limit, + uint64* data) { // Implementation based on the x86-64 version, above. assert(s2_limit >= s2); int matched = 0; @@ -164,6 +201,7 @@ static inline std::pair FindMatchLength(const char* s1, ++matched; } } + *data = UNALIGNED_LOAD64(s2 + matched); return std::pair(matched, matched < 8); } #endif diff --git a/snappy.cc b/snappy.cc index 6bc3651..0807f13 100644 --- a/snappy.cc +++ b/snappy.cc @@ -385,11 +385,18 @@ static inline char* EmitCopyAtMost64(char* op, size_t offset, size_t len) { assert(offset < 65536); assert(len_less_than_12 == (len < 12)); - if (len_less_than_12 && SNAPPY_PREDICT_TRUE(offset < 2048)) { - // offset fits in 11 bits. The 3 highest go in the top of the first byte, - // and the rest go in the second byte. - *op++ = COPY_1_BYTE_OFFSET + ((len - 4) << 2) + ((offset >> 3) & 0xe0); - *op++ = offset & 0xff; + if (len_less_than_12) { + uint32 u = (len << 2) + (offset << 8); + uint32 copy1 = COPY_1_BYTE_OFFSET - (4 << 2) + ((offset >> 3) & 0xe0); + uint32 copy2 = COPY_2_BYTE_OFFSET - (1 << 2); + // It turns out that offset < 2048 is a difficult to predict branch. + // `perf record` shows this is the highest percentage of branch misses in + // benchmarks. This code produces branch free code, the data dependency + // chain that bottlenecks the throughput is so long that a few extra + // instructions are completely free (IPC << 6 because of data deps). + u += offset < 2048 ? copy1 : copy2; + LittleEndian::Store32(op, u); + op += offset < 2048 ? 2 : 3; } else { // Write 4 bytes, though we only care about 3 of them. The output buffer // is required to have some slack, so the extra byte won't overrun it. @@ -615,7 +622,7 @@ char* CompressFragment(const char* input, // "literal bytes" prior to ip. const char* base = ip; std::pair p = - FindMatchLength(candidate + 4, ip + 4, ip_end); + FindMatchLength(candidate + 4, ip + 4, ip_end, &data); size_t matched = 4 + p.first; ip += matched; size_t offset = base - candidate; @@ -629,12 +636,12 @@ char* CompressFragment(const char* input, if (SNAPPY_PREDICT_FALSE(ip >= ip_limit)) { goto emit_remainder; } + assert(LittleEndian::Load64(ip) == data); // We are now looking for a 4-byte match again. We read // table[Hash(ip, shift)] for that. To improve compression, // we also update table[Hash(ip - 1, shift)] and table[Hash(ip, shift)]. - data = LittleEndian::Load64(ip - 1); - table[HashBytes(data, shift)] = ip - base_ip - 1; - data >>= 8; + table[HashBytes(LittleEndian::Load32(ip - 1), shift)] = + ip - base_ip - 1; uint32 hash = HashBytes(data, shift); candidate = base_ip + table[hash]; table[hash] = ip - base_ip; diff --git a/snappy_unittest.cc b/snappy_unittest.cc index 37159c3..73d9cf3 100644 --- a/snappy_unittest.cc +++ b/snappy_unittest.cc @@ -957,8 +957,9 @@ TEST(Snappy, ZeroOffsetCopyValidation) { namespace { int TestFindMatchLength(const char* s1, const char *s2, unsigned length) { + uint64 data; std::pair p = - snappy::internal::FindMatchLength(s1, s2, s2 + length); + snappy::internal::FindMatchLength(s1, s2, s2 + length, &data); CHECK_EQ(p.first < 8, p.second); return p.first; }