Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10377. Allow datanodes to do chunk level modifications to closed containers. #7111

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
Expand Down Expand Up @@ -109,6 +110,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
Expand Down Expand Up @@ -970,6 +972,53 @@ ContainerCommandResponseProto handleWriteChunk(
return getWriteChunkResponseSuccess(request, blockDataProto);
}

/**
* Handle Write Chunk operation for closed container. Calls ChunkManager to process the request.
*
*/
private void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID,
ChunkBuffer data, KeyValueContainer kvContainer,
BlockData blockData) {

try {
Preconditions.checkNotNull(chunkInfo);
Preconditions.checkNotNull(data);
long writeChunkStartTime = Time.monotonicNowNanos();
checkContainerClose(kvContainer);

DispatcherContext dispatcherContext = DispatcherContext.getHandleWriteChunk();

// Set CHUNK_OVERWRITE to overwrite existing chunk.
chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);

// Increment write stats for WriteChunk after write.
metrics.incContainerBytesStats(Type.WriteChunk, chunkInfo.getLen());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write new metrics, this should be a separate processing that we need to track from the normal write chunk path.

metrics.incContainerOpsLatencies(Type.WriteChunk, Time.monotonicNowNanos() - writeChunkStartTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be cleaner to use or introduce something similar to org.apache.hadoop.ozone.util.MetricUtil#captureLatencyNs(org.apache.hadoop.metrics2.lib.MutableRate, org.apache.ratis.util.function.CheckedSupplier<T,E>) to capture latency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it, but I'm reusing the same previous implementation to avoid code duplication.


if (blockData != null) {
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
long putBlockStartTime = Time.monotonicNowNanos();
// Add ChunkInfo to BlockData to be persisted in RocksDb
List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
chunks.add(chunkInfo.getProtoBufMessage());
blockData.setChunks(chunks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this list is implicitly sorted by offset at the time of writing. It's possible the chunk we are patching is in the middle, so will need to re-sort this. This should also show up when tests are added to read data that has been patched with this method.


// To be set from the Replica's BCSId
blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved

blockManager.putBlock(kvContainer, blockData);
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
// Increment write stats for PutBlock after write.
metrics.incContainerBytesStats(Type.PutBlock, numBytes);
metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - putBlockStartTime);
}
} catch (IOException ex) {
LOG.error("Write Chunk failed for closed container", ex);
}
}

/**
* Handle Put Small File operation. Writes the chunk and associated key
* using a single RPC. Calls BlockManager and ChunkManager to process the
Expand Down Expand Up @@ -1146,6 +1195,38 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
throw new StorageContainerException(msg, result);
}

/**
* Check if container is Closed. Throw exception otherwise.
* @param kvContainer
* @throws StorageContainerException
*/
private void checkContainerClose(KeyValueContainer kvContainer)
throws StorageContainerException {

final State containerState = kvContainer.getContainerState();
if (containerState == State.QUASI_CLOSED || containerState == State.CLOSED
|| containerState == State.UNHEALTHY) {
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
return;
}

final ContainerProtos.Result result;
switch (containerState) {
case OPEN:
case CLOSING:
case RECOVERING:
result = UNCLOSED_CONTAINER_IO;
break;
case INVALID:
result = INVALID_CONTAINER_STATE;
break;
default:
result = CONTAINER_INTERNAL_ERROR;
}
String msg = "Requested operation not allowed as ContainerState is " +
containerState;
throw new StorageContainerException(msg, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method just return a bool? Throwing an exception to check the state of a container seems heavy handed. It makes sense for writeChunkForClosedContainer to throw an exception as the contract for that method is that it should be called only for closed containers. This method is just check on the state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used in both putBlockForClosedContainer and writeChunkForClosedContainer, We shouldn't allow putBlock as well for non-closed container, hence the exception. We can change it to boolean and still fail the request, but it would be the same in the end.

}

@Override
public Container importContainer(ContainerData originalContainerData,
final InputStream rawContainerStream,
Expand Down