Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10377. Allow datanodes to do chunk level modifications to closed containers. #7111

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,45 @@ public static XceiverClientReply writeChunkAsync(
return xceiverClient.sendCommandAsync(request);
}

/**
* Calls the container protocol to write a chunk for a closed container.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to write
* @param blockID ID of the block
* @param data the data of the chunk to write
* @param tokenString serialized block token
* @throws IOException if there is an I/O error while performing the call
*/
public static XceiverClientReply writeChunkForClosedContainer(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String tokenString, int replicationIndex)
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
.setBlockID(DatanodeBlockID.newBuilder()
.setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId())
.setReplicaIndex(replicationIndex)
.build())
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.WriteChunkForClosedContainer)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setWriteChunk(writeChunkRequest);

if (tokenString != null) {
builder.setEncodedToken(tokenString);
}
ContainerCommandRequestProto request = builder.build();
return xceiverClient.sendCommandAsync(request);
}

/**
* Allows writing a small file using single RPC. This takes the container
* name, block name and data to write sends all that data to the container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public enum DNAction implements AuditAction {
STREAM_INIT,
FINALIZE_BLOCK,
ECHO,
GET_CONTAINER_MERKLE_TREE;
GET_CONTAINER_MERKLE_TREE,
WRITE_CHUNK_FOR_CLOSED_CONTAINER;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.hadoop.ozone.container.checksum;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -44,6 +46,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;

Expand Down Expand Up @@ -104,6 +107,21 @@ public ByteString getContainerMerkleTree(long containerId, DatanodeDetails dn)
}
}

public XceiverClientReply writeChunkForClosedContainer(ContainerProtos.ChunkInfo chunkInfo, BlockID blockID,
ByteString data, int replicationIndex, DatanodeDetails dn)
throws IOException, ExecutionException, InterruptedException {
XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
// Set OzoneConsts.CHUNK_OVERWRITE
try {
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(blockID.getContainerID())));
return ContainerProtocolCalls.writeChunkForClosedContainer(
xceiverClient, chunkInfo, blockID, data, containerToken, replicationIndex);
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
}

public static Pipeline createSingleNodePipeline(DatanodeDetails dn) {
return Pipeline.newBuilder()
.setNodes(ImmutableList.of(dn))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ private static DNAction getAuditAction(Type cmdType) {
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
case Echo : return DNAction.ECHO;
case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
case WriteChunkForClosedContainer: return DNAction.WRITE_CHUNK_FOR_CLOSED_CONTAINER;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
Expand Down Expand Up @@ -299,6 +300,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
return handler.handleEcho(request, kvContainer);
case GetContainerMerkleTree:
return handler.handleGetContainerMerkleTree(request, kvContainer);
case WriteChunkForClosedContainer:
return handler.handleWriteChunkForClosedContainer(request, kvContainer);
default:
return null;
}
Expand Down Expand Up @@ -970,6 +973,47 @@ ContainerCommandResponseProto handleWriteChunk(
return getWriteChunkResponseSuccess(request, blockDataProto);
}

/**
* Handle Write Chunk operation for closed container. Calls ChunkManager to process the request.
*/
ContainerCommandResponseProto handleWriteChunkForClosedContainer(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

if (!request.hasWriteChunk()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Write Chunk request. trace ID: {}",
request.getTraceID());
}
return malformedRequest(request);
}

try {
checkContainerClose(kvContainer);

WriteChunkRequestProto writeChunk = request.getWriteChunk();
BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
ContainerProtos.ChunkInfo chunkInfoProto = writeChunk.getChunkData();

ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);

DispatcherContext dispatcherContext =
DispatcherContext.getHandleWriteChunk();
ChunkBuffer data =
ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());

chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
request);
}
return getSuccessResponse(request);
}

/**
* Handle Put Small File operation. Writes the chunk and associated key
* using a single RPC. Calls BlockManager and ChunkManager to process the
Expand Down Expand Up @@ -1146,6 +1190,38 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
throw new StorageContainerException(msg, result);
}

/**
* Check if container is Closed. Throw exception otherwise.
* @param kvContainer
* @throws StorageContainerException
*/
private void checkContainerClose(KeyValueContainer kvContainer)
throws StorageContainerException {

final State containerState = kvContainer.getContainerState();
if (containerState == State.QUASI_CLOSED || containerState == State.CLOSED
|| containerState == State.UNHEALTHY) {
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
return;
}

final ContainerProtos.Result result;
switch (containerState) {
case OPEN:
case CLOSING:
case RECOVERING:
result = UNCLOSED_CONTAINER_IO;
break;
case INVALID:
result = INVALID_CONTAINER_STATE;
break;
default:
result = CONTAINER_INTERNAL_ERROR;
}
String msg = "Requested operation not allowed as ContainerState is " +
containerState;
throw new StorageContainerException(msg, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method just return a bool? Throwing an exception to check the state of a container seems heavy handed. It makes sense for writeChunkForClosedContainer to throw an exception as the contract for that method is that it should be called only for closed containers. This method is just check on the state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used in both putBlockForClosedContainer and writeChunkForClosedContainer, We shouldn't allow putBlock as well for non-closed container, hence the exception. We can change it to boolean and still fail the request, but it would be the same in the end.

}

@Override
public Container importContainer(ContainerData originalContainerData,
final InputStream rawContainerStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum Type {
FinalizeBlock = 21;
Echo = 22;
GetContainerMerkleTree = 23;
WriteChunkForClosedContainer = 24;
}


Expand Down