From 37c62e6b1420fb6c8d8fafe17a6a7b30f0bdd2d4 Mon Sep 17 00:00:00 2001 From: skhillon Date: Wed, 11 Feb 2026 12:25:56 -0800 Subject: [PATCH 1/3] WAL tailing reader should resume partial cell reads instead of resetting compression --- .../hbase/io/TagCompressionContext.java | 50 +++- .../hadoop/hbase/io/util/Dictionary.java | 5 + .../hadoop/hbase/io/util/LRUDictionary.java | 5 + .../wal/ProtobufWALTailingReader.java | 111 +++++-- .../hbase/regionserver/wal/WALCellCodec.java | 68 ++++- ...ompressedKvDecoderDeferredDictUpdates.java | 247 ++++++++++++++++ ...TestWALTailingReaderPartialCellResume.java | 270 ++++++++++++++++++ 7 files changed, 730 insertions(+), 26 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressedKvDecoderDeferredDictUpdates.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index f938fdaab35b..bf0951ee3970 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -23,6 +23,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; @@ -39,6 +41,9 @@ @InterfaceAudience.Private public class TagCompressionContext { private final Dictionary tagDict; + private boolean deferAdditions = false; + private final List deferredAdditions = new ArrayList<>(); + private int deferredBaseIndex; public TagCompressionContext(Class dictType, int dictCapacity) throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, @@ -52,6 +57,27 @@ public void clear() { tagDict.clear(); } + public void setDeferAdditions(boolean defer) { + this.deferAdditions = defer; + if (defer) { + deferredBaseIndex = tagDict.size(); + deferredAdditions.clear(); + } + } + + public void commitDeferredAdditions() { + for (byte[] entry : deferredAdditions) { + tagDict.addEntry(entry, 0, entry.length); + } + deferredAdditions.clear(); + deferAdditions = false; + } + + public void clearDeferredAdditions() { + deferredAdditions.clear(); + deferAdditions = false; + } + /** * Compress tags one by one and writes to the OutputStream. * @param out Stream to which the compressed tags to be written @@ -112,11 +138,17 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length) int tagLen = StreamUtils.readRawVarint32(src); offset = Bytes.putAsShort(dest, offset, tagLen); IOUtils.readFully(src, dest, offset, tagLen); - tagDict.addEntry(dest, offset, tagLen); + if (deferAdditions) { + byte[] copy = new byte[tagLen]; + System.arraycopy(dest, offset, copy, 0, tagLen); + deferredAdditions.add(copy); + } else { + tagDict.addEntry(dest, offset, tagLen); + } offset += tagLen; } else { short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src)); - byte[] entry = tagDict.getEntry(dictIdx); + byte[] entry = getDeferredOrDictEntry(dictIdx); if (entry == null) { throw new IOException("Missing dictionary entry for index " + dictIdx); } @@ -127,6 +159,20 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length) } } + private byte[] getDeferredOrDictEntry(short dictIdx) { + if (deferAdditions) { + int deferredIdx = dictIdx - deferredBaseIndex; + if (deferredIdx >= 0 && deferredIdx < deferredAdditions.size()) { + return deferredAdditions.get(deferredIdx); + } + } + try { + return tagDict.getEntry(dictIdx); + } catch (IndexOutOfBoundsException e) { + return null; + } + } + /** * Uncompress tags from the input ByteBuffer and writes to the destination array. * @param src Buffer where the compressed tags are available diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java index b1ab8a9b28d4..3d15cb2c6f43 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -69,6 +69,11 @@ public interface Dictionary { */ short addEntry(byte[] data, int offset, int length); + /** + * Returns the number of entries in the dictionary. + */ + int size(); + /** * Flushes the dictionary, empties all values. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java index 4089863d4387..93db6f41dbb3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java @@ -67,6 +67,11 @@ private short addEntryInternal(byte[] data, int offset, int length, boolean copy return backingStore.put(data, offset, length, copy); } + @Override + public int size() { + return backingStore.currSize; + } + @Override public void clear() { backingStore.clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java index 6cf141d7053e..a3184b6685ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java @@ -30,11 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -50,6 +48,10 @@ public class ProtobufWALTailingReader extends AbstractProtobufWALReader private DelegatingInputStream delegatingInput; + private Entry pendingEntry = null; + private int pendingRemainingCells = 0; + private long pendingResumePosition = -1; + private static final class ReadWALKeyResult { final State state; final Entry entry; @@ -184,6 +186,10 @@ private Result editError() { } private Result readWALEdit(Entry entry, int followingKvCount) { + return readCellsIntoEntry(entry, followingKvCount, false); + } + + private Result readCellsIntoEntry(Entry entry, int remainingCells, boolean isResume) { long posBefore; try { posBefore = inputStream.getPos(); @@ -191,30 +197,44 @@ private Result readWALEdit(Entry entry, int followingKvCount) { LOG.warn("failed to get position", e); return State.ERROR_AND_RESET.getResult(); } - if (followingKvCount == 0) { - LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", - posBefore); + if (remainingCells == 0) { + if (!isResume) { + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", + posBefore); + } return new Result(State.NORMAL, entry, posBefore); } - int actualCells; - try { - actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount); - } catch (Exception e) { - String message = " while reading " + followingKvCount + " WAL KVs; started reading at " - + posBefore + " and read up to " + getPositionQuietly(); - IOException realEofEx = extractHiddenEof(e); - if (realEofEx != null) { - LOG.warn("EOF " + message, realEofEx); - return editEof(); - } else { - LOG.warn("Error " + message, e); + long lastGoodPos = posBefore; + int cellsRead = 0; + for (int i = 0; i < remainingCells; i++) { + try { + lastGoodPos = inputStream.getPos(); + } catch (IOException e) { + LOG.warn("failed to get position before cell read", e); return editError(); } - } - if (actualCells != followingKvCount) { - LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}", - actualCells, followingKvCount, posBefore, getPositionQuietly()); - return editEof(); + boolean advanced; + try { + advanced = cellDecoder.advance(); + } catch (Exception e) { + IOException realEofEx = extractHiddenEof(e); + if (realEofEx != null) { + LOG.debug("EOF after reading {} of {} cells; started reading at {}, last good pos={}", + cellsRead, remainingCells, posBefore, lastGoodPos, realEofEx); + return savePendingAndReturnEof(entry, remainingCells - cellsRead, lastGoodPos); + } else { + LOG.warn("Error after reading {} of {} cells; started reading at {}, read up to {}", + cellsRead, remainingCells, posBefore, getPositionQuietly(), e); + return editError(); + } + } + if (!advanced) { + LOG.debug("EOF (advance returned false) after reading {} of {} cells; started at {}," + + " last good pos={}", cellsRead, remainingCells, posBefore, lastGoodPos); + return savePendingAndReturnEof(entry, remainingCells - cellsRead, lastGoodPos); + } + entry.getEdit().add(cellDecoder.current()); + cellsRead++; } long posAfter; try { @@ -231,8 +251,45 @@ private Result readWALEdit(Entry entry, int followingKvCount) { return new Result(State.NORMAL, entry, posAfter); } + private Result savePendingAndReturnEof(Entry entry, int remaining, long resumePos) { + if (hasCompression) { + pendingEntry = entry; + pendingRemainingCells = remaining; + pendingResumePosition = resumePos; + return new Result(State.EOF_AND_RESET, null, resumePos); + } + return editEof(); + } + + private void clearPendingState() { + pendingEntry = null; + pendingRemainingCells = 0; + pendingResumePosition = -1; + } + @Override public Result next(long limit) { + if (pendingEntry != null) { + long originalPosition; + try { + originalPosition = inputStream.getPos(); + } catch (IOException e) { + LOG.warn("failed to get position", e); + clearPendingState(); + return State.EOF_AND_RESET.getResult(); + } + if (limit < 0) { + delegatingInput.setDelegate(inputStream); + } else if (limit <= originalPosition) { + return State.EOF_AND_RESET.getResult(); + } else { + delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition)); + } + Entry entry = pendingEntry; + int remaining = pendingRemainingCells; + clearPendingState(); + return readCellsIntoEntry(entry, remaining, true); + } long originalPosition; try { originalPosition = inputStream.getPos(); @@ -268,6 +325,13 @@ private void skipHeader(FSDataInputStream stream) throws IOException { @Override public void resetTo(long position, boolean resetCompression) throws IOException { + if (resetCompression) { + clearPendingState(); + } + long seekPosition = position; + if (!resetCompression && pendingResumePosition > 0) { + seekPosition = pendingResumePosition; + } close(); Pair pair = open(); boolean resetSucceed = false; @@ -283,6 +347,7 @@ public void resetTo(long position, boolean resetCompression) throws IOException if (compressionCtx != null) { compressionCtx.clear(); } + clearPendingState(); skipHeader(inputStream); } else if (resetCompression && compressionCtx != null) { // clear compressCtx and skip to the expected position, to fill up the dictionary @@ -293,7 +358,7 @@ public void resetTo(long position, boolean resetCompression) throws IOException } } else { // just seek to the expected position - inputStream.seek(position); + inputStream.seek(seekPosition); } resetSucceed = true; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 8645f6054f89..856c6669bbe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ExtendedCell; @@ -280,6 +282,32 @@ static class CompressedKvDecoder extends BaseDecoder { private final boolean hasValueCompression; private final boolean hasTagCompression; + // When the WAL tailing reader hits EOF mid-cell, the compression dictionaries must remain + // in the state they were after the last fully-read cell. Otherwise the reader would need + // an expensive O(n) reset (re-read from the start of the file to rebuild dictionary state). + // To achieve this, dictionary additions for ROW, FAMILY, and QUALIFIER are buffered here + // and only flushed on successful cell parse. On failure, they are discarded. + // Tag dictionary additions are deferred similarly via TagCompressionContext. + private final List pendingDictAdditions = new ArrayList<>(); + + // Tracks whether we are in the value decompression phase of parseCellInner(), so that on + // IOException we know whether the ValueCompressor's internal state needs to be reset. + private boolean readingValue = false; + + private static class PendingDictAddition { + final Dictionary dict; + final byte[] data; + final int offset; + final int length; + + PendingDictAddition(Dictionary dict, byte[] data, int offset, int length) { + this.dict = dict; + this.data = data; + this.offset = offset; + this.length = length; + } + } + public CompressedKvDecoder(InputStream in, CompressionContext compression) { super(in); this.compression = compression; @@ -287,8 +315,44 @@ public CompressedKvDecoder(InputStream in, CompressionContext compression) { this.hasTagCompression = compression.hasTagCompression(); } + private void commitPendingAdditions() { + for (PendingDictAddition pending : pendingDictAdditions) { + pending.dict.addEntry(pending.data, pending.offset, pending.length); + } + pendingDictAdditions.clear(); + if (hasTagCompression) { + compression.tagCompressionContext.commitDeferredAdditions(); + } + } + + private void clearPendingAdditions() { + pendingDictAdditions.clear(); + if (hasTagCompression) { + compression.tagCompressionContext.clearDeferredAdditions(); + } + } + @Override protected ExtendedCell parseCell() throws IOException { + clearPendingAdditions(); + if (hasTagCompression) { + compression.tagCompressionContext.setDeferAdditions(true); + } + readingValue = false; + try { + ExtendedCell cell = parseCellInner(); + commitPendingAdditions(); + return cell; + } catch (IOException e) { + clearPendingAdditions(); + if (readingValue && hasValueCompression) { + compression.getValueCompressor().clear(); + } + throw e; + } + } + + private ExtendedCell parseCellInner() throws IOException { int keylength = StreamUtils.readRawVarint32(in); int vlength = StreamUtils.readRawVarint32(in); int tagsLength = StreamUtils.readRawVarint32(in); @@ -334,7 +398,9 @@ protected ExtendedCell parseCell() throws IOException { pos = Bytes.putByte(backingArray, pos, (byte) in.read()); int valLen = typeValLen - 1; if (hasValueCompression) { + readingValue = true; readCompressedValue(in, backingArray, pos, valLen); + readingValue = false; pos += valLen; } else { IOUtils.readFully(in, backingArray, pos, valLen); @@ -359,7 +425,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep // if this isn't in the dictionary, we need to add to the dictionary. int length = StreamUtils.readRawVarint32(in); IOUtils.readFully(in, to, offset, length); - dict.addEntry(to, offset, length); + pendingDictAdditions.add(new PendingDictAddition(dict, to, offset, length)); return length; } else { // the status byte also acts as the higher order byte of the dictionary entry. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressedKvDecoderDeferredDictUpdates.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressedKvDecoderDeferredDictUpdates.java new file mode 100644 index 000000000000..39361d035ff2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressedKvDecoderDeferredDictUpdates.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCompressedKvDecoderDeferredDictUpdates { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompressedKvDecoderDeferredDictUpdates.class); + + private static final byte[] FAMILY = Bytes.toBytes("cf"); + + private static final Set CELL_DICTIONARIES = + EnumSet.of(CompressionContext.DictionaryIndex.ROW, CompressionContext.DictionaryIndex.FAMILY, + CompressionContext.DictionaryIndex.QUALIFIER); + + private static byte[] safeGetEntry(Dictionary dict, short idx) { + try { + return dict.getEntry(idx); + } catch (IndexOutOfBoundsException e) { + return null; + } + } + + private KeyValue createKV(String row, String qualifier, String value, int numTags) { + List tags = new ArrayList<>(numTags); + for (int i = 1; i <= numTags; i++) { + tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tag" + row + "-" + i))); + } + return new KeyValue(Bytes.toBytes(row), FAMILY, Bytes.toBytes(qualifier), + HConstants.LATEST_TIMESTAMP, Bytes.toBytes(value), tags); + } + + private byte[] encodeCells(List cells, CompressionContext ctx, Configuration conf) + throws IOException { + WALCellCodec codec = new WALCellCodec(conf, ctx); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Codec.Encoder encoder = codec.getEncoder(bos); + for (KeyValue kv : cells) { + encoder.write(kv); + } + encoder.flush(); + return bos.toByteArray(); + } + + private int readUntilEof(Codec.Decoder decoder) throws IOException { + int count = 0; + boolean hitEof = false; + while (!hitEof) { + try { + if (!decoder.advance()) { + hitEof = true; + } else { + count++; + } + } catch (Exception e) { + hitEof = true; + } + } + return count; + } + + private void assertDictionariesMatch(CompressionContext actual, CompressionContext expected, + int truncLen, int successfulCells) { + for (CompressionContext.DictionaryIndex idx : CELL_DICTIONARIES) { + Dictionary actualDict = actual.getDictionary(idx); + Dictionary expectedDict = expected.getDictionary(idx); + for (short s = 0; s < Short.MAX_VALUE; s++) { + byte[] actualEntry = safeGetEntry(actualDict, s); + byte[] expectedEntry = safeGetEntry(expectedDict, s); + if (actualEntry == null && expectedEntry == null) { + break; + } + assertArrayEquals( + String.format("Dictionary %s entry %d mismatch at truncLen=%d, successfulCells=%d", idx, + s, truncLen, successfulCells), + expectedEntry, actualEntry); + } + } + } + + private void verifyDictsMatchAfterTruncation(List cells, boolean hasTagCompression, + boolean hasValueCompression, Compression.Algorithm valueAlgo) throws Exception { + Configuration conf = new Configuration(false); + if (hasTagCompression) { + conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + } + CompressionContext writeCtx = + new CompressionContext(LRUDictionary.class, false, hasTagCompression, hasValueCompression, + hasValueCompression ? valueAlgo : Compression.Algorithm.NONE); + byte[] fullData = encodeCells(cells, writeCtx, conf); + + for (int truncLen = 1; truncLen < fullData.length; truncLen++) { + byte[] truncated = Arrays.copyOf(fullData, truncLen); + CompressionContext readCtx = + new CompressionContext(LRUDictionary.class, false, hasTagCompression, hasValueCompression, + hasValueCompression ? valueAlgo : Compression.Algorithm.NONE); + Codec.Decoder decoder = + new WALCellCodec(conf, readCtx).getDecoder(new ByteArrayInputStream(truncated)); + int successfulCells = readUntilEof(decoder); + + CompressionContext verifyCtx = + new CompressionContext(LRUDictionary.class, false, hasTagCompression, hasValueCompression, + hasValueCompression ? valueAlgo : Compression.Algorithm.NONE); + Codec.Decoder verifyDecoder = + new WALCellCodec(conf, verifyCtx).getDecoder(new ByteArrayInputStream(fullData)); + for (int i = 0; i < successfulCells; i++) { + assertTrue("verifyDecoder.advance() should return true for cell " + i, + verifyDecoder.advance()); + } + + assertDictionariesMatch(readCtx, verifyCtx, truncLen, successfulCells); + } + } + + @Test + public void itPreservesDictionaryStateOnTruncatedStream() throws Exception { + List cells = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + cells.add(createKV("row-" + i, "qual-" + i, "value-" + i, 0)); + } + verifyDictsMatchAfterTruncation(cells, false, false, null); + } + + @Test + public void itPreservesDictionaryStateWithTagCompression() throws Exception { + List cells = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + cells.add(createKV("row-" + i, "qual-" + i, "value-" + i, 2)); + } + verifyDictsMatchAfterTruncation(cells, true, false, null); + } + + @Test + public void itPreservesDictionaryStateWithValueCompression() throws Exception { + List cells = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + byte[] value = new byte[64]; + Bytes.random(value); + cells.add(createKV("row-" + i, "qual-" + i, Bytes.toString(value), 0)); + } + verifyDictsMatchAfterTruncation(cells, false, true, Compression.Algorithm.GZ); + } + + @Test + public void itCanResumeAfterTruncation() throws Exception { + List cells = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + cells.add(createKV("row-" + i, "qual-" + i, "value-" + i, 0)); + } + + Configuration conf = new Configuration(false); + CompressionContext writeCtx = new CompressionContext(LRUDictionary.class, false, false); + byte[] fullData = encodeCells(cells, writeCtx, conf); + + int[] cellEndOffsets = new int[cells.size()]; + { + CompressionContext scanCtx = new CompressionContext(LRUDictionary.class, false, false); + WALCellCodec scanCodec = new WALCellCodec(conf, scanCtx); + for (int i = 0; i < cells.size(); i++) { + ByteArrayOutputStream cellBos = new ByteArrayOutputStream(); + Codec.Encoder cellEncoder = scanCodec.getEncoder(cellBos); + cellEncoder.write(cells.get(i)); + cellEncoder.flush(); + cellEndOffsets[i] = (i == 0) ? cellBos.size() : cellEndOffsets[i - 1] + cellBos.size(); + } + } + + for (int cellIdx = 0; cellIdx < cells.size() - 1; cellIdx++) { + int truncPoint = cellEndOffsets[cellIdx] + 1; + if (truncPoint >= fullData.length) { + continue; + } + byte[] truncated = Arrays.copyOf(fullData, truncPoint); + + CompressionContext readCtx = new CompressionContext(LRUDictionary.class, false, false); + WALCellCodec readCodec = new WALCellCodec(conf, readCtx); + Codec.Decoder decoder = readCodec.getDecoder(new ByteArrayInputStream(truncated)); + int successfulCells = readUntilEof(decoder); + assertEquals("successfulCells at cellIdx=" + cellIdx, cellIdx + 1, successfulCells); + + int resumeOffset = cellEndOffsets[cellIdx]; + Codec.Decoder resumeDecoder = readCodec.getDecoder( + new ByteArrayInputStream(fullData, resumeOffset, fullData.length - resumeOffset)); + + CompressionContext verifyCtx = new CompressionContext(LRUDictionary.class, false, false); + Codec.Decoder verifyDecoder = + new WALCellCodec(conf, verifyCtx).getDecoder(new ByteArrayInputStream(fullData)); + for (int i = 0; i < successfulCells; i++) { + assertTrue(verifyDecoder.advance()); + } + + for (int i = successfulCells; i < cells.size(); i++) { + assertTrue("resume should advance for cell " + i, resumeDecoder.advance()); + assertTrue("verify should advance for cell " + i, verifyDecoder.advance()); + assertArrayEquals(String.format("cell %d content mismatch at cellIdx=%d", i, cellIdx), + ((KeyValue) verifyDecoder.current()).getBuffer(), + ((KeyValue) resumeDecoder.current()).getBuffer()); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java new file mode 100644 index 000000000000..39681ed3fbef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALTailingReaderPartialCellResume { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALTailingReaderPartialCellResume.class); + + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static FileSystem FS; + + private static final TableName TN = TableName.valueOf("test"); + private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build(); + private static final byte[] FAMILY = Bytes.toBytes("family"); + + @BeforeClass + public static void setUp() throws IOException { + UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); + UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + FS = FileSystem.getLocal(UTIL.getConfiguration()); + if (!FS.mkdirs(UTIL.getDataTestDir())) { + throw new IOException("can not create " + UTIL.getDataTestDir()); + } + } + + @AfterClass + public static void tearDown() { + UTIL.cleanupTestDir(); + } + + private WAL.Entry createEntry(int index, int numCells) { + WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, index, + EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); + WALEdit edit = new WALEdit(); + for (int c = 0; c < numCells; c++) { + edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(Bytes.toBytes("row-" + index)).setFamily(FAMILY) + .setQualifier(Bytes.toBytes("qual-" + index + "-" + c)) + .setValue(Bytes.toBytes("value-" + index + "-" + c)).build()); + } + return new WAL.Entry(key, edit); + } + + @Test + public void itReturnsEofAndResetNotCompressionOnPartialEntry() throws Exception { + Path walFile = UTIL.getDataTestDir("wal-partial"); + List endOffsets = new ArrayList<>(); + try (WALProvider.Writer writer = + WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) { + for (int i = 0; i < 5; i++) { + writer.append(createEntry(i, 1)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + writer.append(createEntry(5, 3)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + + long fileLength = FS.getFileStatus(walFile).getLen(); + byte[] content = new byte[(int) fileLength]; + try (FSDataInputStream in = FS.open(walFile)) { + in.readFully(content); + } + + long lastSingleCellEnd = endOffsets.get(4); + int truncPoint = (int) (lastSingleCellEnd + (endOffsets.get(5) - lastSingleCellEnd) / 2); + Path truncFile = UTIL.getDataTestDir("wal-trunc"); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, truncPoint); + } + + try (WALTailingReader reader = + WALFactory.createTailingReader(FS, truncFile, UTIL.getConfiguration(), -1)) { + for (int i = 0; i < 5; i++) { + WALTailingReader.Result result = reader.next(-1); + assertEquals("State should be NORMAL for entry " + i, WALTailingReader.State.NORMAL, + result.getState()); + assertArrayEquals("Row should match for entry " + i, Bytes.toBytes("row-" + i), + CellUtil.cloneRow(result.getEntry().getEdit().getCells().get(0))); + } + + WALTailingReader.Result eofResult = reader.next(-1); + assertEquals( + "With deferred dict updates, EOF mid-cell should return EOF_AND_RESET," + + " not EOF_AND_RESET_COMPRESSION", + WALTailingReader.State.EOF_AND_RESET, eofResult.getState()); + } + } + + @Test + public void itResumesPartialEntryAfterReset() throws Exception { + Path walFile = UTIL.getDataTestDir("wal-resume"); + List endOffsets = new ArrayList<>(); + try (WALProvider.Writer writer = + WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) { + for (int i = 0; i < 3; i++) { + writer.append(createEntry(i, 1)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + writer.append(createEntry(3, 2)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + + long fileLength = FS.getFileStatus(walFile).getLen(); + byte[] content = new byte[(int) fileLength]; + try (FSDataInputStream in = FS.open(walFile)) { + in.readFully(content); + } + + long lastSingleEnd = endOffsets.get(2); + int truncPoint = (int) (lastSingleEnd + (endOffsets.get(3) - lastSingleEnd) / 2); + Path truncFile = UTIL.getDataTestDir("wal-resume-trunc"); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, truncPoint); + } + + try (WALTailingReader reader = + WALFactory.createTailingReader(FS, truncFile, UTIL.getConfiguration(), -1)) { + for (int i = 0; i < 3; i++) { + WALTailingReader.Result result = reader.next(-1); + assertEquals("State should be NORMAL for entry " + i, WALTailingReader.State.NORMAL, + result.getState()); + } + + WALTailingReader.Result eofResult = reader.next(-1); + assertEquals(WALTailingReader.State.EOF_AND_RESET, eofResult.getState()); + long eofPos = eofResult.getEntryEndPos(); + + FS.delete(truncFile, false); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, content.length); + } + + reader.resetTo(eofPos, false); + WALTailingReader.Result resumeResult = reader.next(-1); + assertEquals(WALTailingReader.State.NORMAL, resumeResult.getState()); + WAL.Entry entry = resumeResult.getEntry(); + assertEquals(2, entry.getEdit().getCells().size()); + assertArrayEquals(Bytes.toBytes("row-3"), + CellUtil.cloneRow(entry.getEdit().getCells().get(0))); + assertArrayEquals(Bytes.toBytes("qual-3-0"), + CellUtil.cloneQualifier(entry.getEdit().getCells().get(0))); + assertArrayEquals(Bytes.toBytes("qual-3-1"), + CellUtil.cloneQualifier(entry.getEdit().getCells().get(1))); + } + } + + @Test + public void itHandlesMultipleConsecutivePartialReads() throws Exception { + Path walFile = UTIL.getDataTestDir("wal-multi-partial"); + List endOffsets = new ArrayList<>(); + try (WALProvider.Writer writer = + WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) { + for (int i = 0; i < 2; i++) { + writer.append(createEntry(i, 1)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + writer.append(createEntry(2, 3)); + writer.sync(true); + endOffsets.add(writer.getLength()); + } + + long fileLength = FS.getFileStatus(walFile).getLen(); + byte[] content = new byte[(int) fileLength]; + try (FSDataInputStream in = FS.open(walFile)) { + in.readFully(content); + } + + long multiCellStart = endOffsets.get(1); + long multiCellEnd = endOffsets.get(2); + int midRange = (int) (multiCellEnd - multiCellStart); + + int truncPoint1 = (int) multiCellStart + midRange / 4; + int truncPoint2 = (int) multiCellStart + midRange / 2; + + Path truncFile = UTIL.getDataTestDir("wal-multi-partial-trunc"); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, truncPoint1); + } + + try (WALTailingReader reader = + WALFactory.createTailingReader(FS, truncFile, UTIL.getConfiguration(), -1)) { + for (int i = 0; i < 2; i++) { + WALTailingReader.Result result = reader.next(-1); + assertEquals(WALTailingReader.State.NORMAL, result.getState()); + } + + WALTailingReader.Result eof1 = reader.next(-1); + assertEquals(WALTailingReader.State.EOF_AND_RESET, eof1.getState()); + long pos1 = eof1.getEntryEndPos(); + + FS.delete(truncFile, false); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, truncPoint2); + } + reader.resetTo(pos1, false); + + WALTailingReader.Result eof2 = reader.next(-1); + assertEquals(WALTailingReader.State.EOF_AND_RESET, eof2.getState()); + long pos2 = eof2.getEntryEndPos(); + + FS.delete(truncFile, false); + try (FSDataOutputStream out = FS.create(truncFile)) { + out.write(content, 0, content.length); + } + reader.resetTo(pos2, false); + + WALTailingReader.Result finalResult = reader.next(-1); + assertEquals(WALTailingReader.State.NORMAL, finalResult.getState()); + WAL.Entry entry = finalResult.getEntry(); + assertEquals(3, entry.getEdit().getCells().size()); + assertArrayEquals(Bytes.toBytes("row-2"), + CellUtil.cloneRow(entry.getEdit().getCells().get(0))); + } + } +} From 106592273b35986d4e076513fbda09b6c2bbc26b Mon Sep 17 00:00:00 2001 From: skhillon Date: Wed, 11 Feb 2026 12:30:11 -0800 Subject: [PATCH 2/3] Spotless --- .../hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java index a3184b6685ac..d4c1fe20e5db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java @@ -30,9 +30,11 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; From 874e4ff9117b30328482b1f35db289a9309bdbd4 Mon Sep 17 00:00:00 2001 From: skhillon Date: Wed, 11 Feb 2026 13:04:08 -0800 Subject: [PATCH 3/3] Use renamed testing utility --- .../hbase/wal/TestWALTailingReaderPartialCellResume.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java index 39681ed3fbef..c0cfe83c548a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALTailingReaderPartialCellResume.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -55,7 +55,7 @@ public class TestWALTailingReaderPartialCellResume { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALTailingReaderPartialCellResume.class); - private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); private static FileSystem FS;