Skip to content

Commit

Permalink
HDDS-11763. Added test cases and fixed bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinshakil committed Jan 6, 2025
1 parent e294940 commit 99de480
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public class DatanodeStateMachine implements Closeable {

private final DatanodeQueueMetrics queueMetrics;
private final ReconfigurationHandler reconfigurationHandler;
private final DNContainerOperationClient dnClient;
/**
* Constructs a datanode state machine.
* @param datanodeDetails - DatanodeDetails used to identify a datanode
Expand Down Expand Up @@ -230,7 +231,7 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,

// TODO HDDS-11218 combine the clients used for reconstruction and reconciliation so they share the same cache of
// datanode clients.
DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient);
dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
Expand Down Expand Up @@ -753,4 +754,8 @@ public DatanodeQueueMetrics getQueueMetrics() {
public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}

public DNContainerOperationClient getDnContainerOperationClientClient() {
return dnClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
Expand Down Expand Up @@ -555,7 +556,7 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}

private void createContainerMerkleTree(Container container) {
public void createContainerMerkleTree(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
return;
}
Expand Down Expand Up @@ -1075,7 +1076,7 @@ public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfo
// To be set from the Replica's BCSId
blockData.setBlockCommitSequenceId(blockCommitSequenceId);

blockManager.putBlock(kvContainer, blockData, false);
blockManager.putBlock(kvContainer, blockData, true);
ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
// Increment write stats for PutBlock after write.
Expand Down Expand Up @@ -1468,12 +1469,12 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) {
reconcileChunk(kvContainer, containerData, tokenHelper, scmBlockSize, xceiverClient, entry);
}
updateContainerChecksum(containerData);
} finally {
dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
}
}

updateContainerChecksum(containerData);
long dataChecksum = updateContainerChecksum(containerData);
LOG.info("Checksum data for container {} is updated to {}", containerData.getContainerID(), dataChecksum);
containerData.setDataChecksum(dataChecksum);
Expand All @@ -1500,7 +1501,7 @@ private void handleMissingBlock(KeyValueContainer container, ContainerData conta
ContainerProtos.BlockMerkleTree missingBlock) throws IOException {
BlockID blockID = new BlockID(containerData.getContainerID(), missingBlock.getBlockID());
Token<OzoneBlockTokenIdentifier> blockToken = tokenHelper.getBlockToken(blockID, scmBlockSize);
// TODo: Cache the blockResponse to reuse it again.
// TODO: Cache the blockResponse to reuse it again.
ContainerProtos.GetBlockResponseProto blockResponse = ContainerProtocolCalls.getBlock(xceiverClient, blockID,
blockToken, new HashMap<>());
// TODO: Add BcsId in BlockMerkleTree to avoid this call
Expand All @@ -1511,7 +1512,9 @@ private void handleMissingBlock(KeyValueContainer container, ContainerData conta
for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) {
ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, blockID, blockToken);
ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
writeChunkForClosedContainer(ChunkInfo.getFromProtoBuf(chunkInfoProto), blockID, chunkBuffer, container);
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container);
}

putBlockForClosedContainer(chunksList, container, BlockData.getFromProtoBuf(blockResponse.getBlockData()),
Expand Down Expand Up @@ -1553,7 +1556,9 @@ private void reconcileChunk(KeyValueContainer container, ContainerData container
if (offsets.contains(chunkInfoProto.getOffset())) {
ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, blockID, blockToken);
ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
writeChunkForClosedContainer(ChunkInfo.getFromProtoBuf(chunkInfoProto), blockID, chunkBuffer, container);
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public long persistPutBlock(KeyValueContainer container,
// default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well
if ((bcsId != 0) && (bcsId <= containerBCSId)) {
if ((bcsId != 0) && (bcsId < containerBCSId)) {
// Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ public ReplicationServer getReplicationServer() {
return replicationServer;
}

public ContainerChecksumTreeManager getChecksumTreeManager() {
return checksumTreeManager;
}

public void compactDb() {
for (StorageVolume volume : volumeSet.getVolumesList()) {
HddsVolume hddsVolume = (HddsVolume) volume;
Expand Down
Loading

0 comments on commit 99de480

Please sign in to comment.