Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public final class OmMultipartKeyInfo extends WithObjectID implements CopyObject
// This stores the schema version of the multipart key.
// 0 - Legacy Schema -> Uses the same table to store the multipart part info
// 1 - New Schema -> Uses a separate table to store the multipart part info
private final byte schemaVersion;
public static final int LEGACY_SCHEMA_VERSION = 0;
public static final int SPLIT_PARTS_SCHEMA_VERSION = 1;
private final int schemaVersion;

public static Codec<OmMultipartKeyInfo> getCodec() {
return CODEC;
Expand Down Expand Up @@ -258,10 +260,6 @@ public PartKeyInfoMap getPartKeyInfoMap() {
}

public void addPartKeyInfo(PartKeyInfo partKeyInfo) {
if (schemaVersion == 1) {
throw new IllegalStateException(
"PartKeyInfoMap is not supported for schemaVersion 1");
}
this.partKeyInfoMap = PartKeyInfoMap.put(partKeyInfo, partKeyInfoMap);
}

Expand All @@ -273,7 +271,7 @@ public ReplicationConfig getReplicationConfig() {
return replicationConfig;
}

public byte getSchemaVersion() {
public int getSchemaVersion() {
return schemaVersion;
}

Expand All @@ -295,7 +293,7 @@ public static class Builder extends WithObjectID.Builder<OmMultipartKeyInfo> {
private final AclListBuilder acls;
private final TreeMap<Integer, PartKeyInfo> partKeyInfoList;
private long parentID;
private byte schemaVersion;
private int schemaVersion;

public Builder() {
this.acls = AclListBuilder.empty();
Expand All @@ -314,10 +312,8 @@ public Builder(OmMultipartKeyInfo multipartKeyInfo) {
this.acls = AclListBuilder.of(multipartKeyInfo.acls);
this.partKeyInfoList = new TreeMap<>();

if (multipartKeyInfo.getSchemaVersion() == 0) {
for (PartKeyInfo partKeyInfo : multipartKeyInfo.partKeyInfoMap) {
this.partKeyInfoList.put(partKeyInfo.getPartNumber(), partKeyInfo);
}
for (PartKeyInfo partKeyInfo : multipartKeyInfo.partKeyInfoMap) {
this.partKeyInfoList.put(partKeyInfo.getPartNumber(), partKeyInfo);
}

this.parentID = multipartKeyInfo.parentID;
Expand Down Expand Up @@ -408,8 +404,8 @@ public Builder setParentID(long parentObjId) {
return this;
}

public Builder setSchemaVersion(byte schemaVersion) {
this.schemaVersion = schemaVersion;
public Builder setSchemaVersion(int schemaVersion) {
this.schemaVersion = validateAndConvertSchemaVersion(schemaVersion);
return this;
}

Expand All @@ -427,10 +423,11 @@ protected OmMultipartKeyInfo buildObject() {
public static Builder builderFromProto(
MultipartKeyInfo multipartKeyInfo) {
final SortedMap<Integer, PartKeyInfo> list = new TreeMap<>();
if (!multipartKeyInfo.hasSchemaVersion() || multipartKeyInfo.getSchemaVersion() == 0) {
multipartKeyInfo.getPartKeyInfoListList().forEach(partKeyInfo ->
list.put(partKeyInfo.getPartNumber(), partKeyInfo));
}
// Keep reading the embedded part list for both schema versions so MPU
// commit/complete/list flows stay backward compatible until the parts
// split-table write/read path is fully wired in all requests.
multipartKeyInfo.getPartKeyInfoListList().forEach(partKeyInfo ->
list.put(partKeyInfo.getPartNumber(), partKeyInfo));

final ReplicationConfig replicationConfig = ReplicationConfig.fromProto(
multipartKeyInfo.getType(),
Expand All @@ -455,7 +452,8 @@ public static Builder builderFromProto(
.setObjectID(multipartKeyInfo.getObjectID())
.setUpdateID(multipartKeyInfo.getUpdateID())
.setParentID(multipartKeyInfo.getParentID())
.setSchemaVersion((byte) multipartKeyInfo.getSchemaVersion());
.setSchemaVersion(validateAndConvertSchemaVersion(
multipartKeyInfo.getSchemaVersion()));
}

/**
Expand All @@ -473,11 +471,6 @@ public static OmMultipartKeyInfo getFromProto(
* @return MultipartKeyInfo
*/
public MultipartKeyInfo getProto() {
if (schemaVersion == 1 && partKeyInfoMap != null && partKeyInfoMap.size() > 0) {
throw new IllegalStateException(
"PartKeyInfoMap must be empty for schemaVersion 1");
}

MultipartKeyInfo.Builder builder = MultipartKeyInfo.newBuilder()
.setUploadID(uploadID)
.setCreationTime(creationTime)
Expand Down Expand Up @@ -507,12 +500,21 @@ public MultipartKeyInfo getProto() {
}

builder.addAllAcls(OzoneAclUtil.toProtobuf(acls));
if (schemaVersion == 0) {
builder.addAllPartKeyInfoList(partKeyInfoMap);
}
// Keep serializing the embedded part list for both schema versions for
// compatibility with existing MPU request flows.
builder.addAllPartKeyInfoList(partKeyInfoMap);
return builder.build();
}

private static int validateAndConvertSchemaVersion(int schemaVersion) {
if (schemaVersion != LEGACY_SCHEMA_VERSION
&& schemaVersion != SPLIT_PARTS_SCHEMA_VERSION) {
throw new IllegalArgumentException("Unsupported schemaVersion: "
+ schemaVersion + ". Expected one of [0, 1].");
}
return schemaVersion;
}

@Override
public String getObjectInfo() {
return getProto().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,50 @@ public void distinctListOfParts() {
}

@Test
public void addPartKeyInfoRejectsSchemaVersionOne() {
public void addPartKeyInfoSupportsSchemaVersionOne() {
OmMultipartKeyInfo subject = createSubject()
.setSchemaVersion((byte) 1)
.setSchemaVersion(1)
.build();

assertThrows(IllegalStateException.class,
() -> subject.addPartKeyInfo(createPart(createKeyInfo()).build()));
subject.addPartKeyInfo(createPart(createKeyInfo()).build());
assertEquals(1, subject.getPartKeyInfoMap().size());
}

@Test
public void getProtoRejectsLegacyPartListForSchemaVersionOne() {
public void getProtoSupportsPartListForSchemaVersionOne() {
PartKeyInfo part = createPart(createKeyInfo()).build();
TreeMap<Integer, PartKeyInfo> legacyMap = new TreeMap<>();
legacyMap.put(part.getPartNumber(), part);

OmMultipartKeyInfo subject = new OmMultipartKeyInfo.Builder()
.setUploadID(UUID.randomUUID().toString())
.setCreationTime(Time.now())
.setSchemaVersion((byte) 1)
.setSchemaVersion(1)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setPartKeyInfoList(legacyMap)
.build();

assertThrows(IllegalStateException.class, subject::getProto);
OzoneManagerProtocolProtos.MultipartKeyInfo proto = subject.getProto();
OmMultipartKeyInfo fromProto = OmMultipartKeyInfo.getFromProto(proto);
assertEquals(1, fromProto.getSchemaVersion());
assertEquals(1, fromProto.getPartKeyInfoMap().size());
}

@Test
public void builderFromProtoRejectsUnsupportedSchemaVersion() {
OmMultipartKeyInfo subject = createSubject()
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.build();

OzoneManagerProtocolProtos.MultipartKeyInfo invalidProto = subject.getProto()
.toBuilder()
.setSchemaVersion(256)
.build();

assertThrows(IllegalArgumentException.class,
() -> OmMultipartKeyInfo.getFromProto(invalidProto));
}

private static OmMultipartKeyInfo.Builder createSubject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,7 @@ message ServiceInfo {

message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
optional uint32 schemaVersion = 2;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {

KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs.build(),
ozoneManager, ACLType.CREATE);
int schemaVersion = resolveMultipartSchemaVersion(multipartInfoInitiateRequest);
MultipartInfoInitiateRequest.Builder requestBuilder =
multipartInfoInitiateRequest.toBuilder()
.setKeyArgs(resolvedArgs)
.setSchemaVersion(schemaVersion);
return getOmRequest().toBuilder()
.setUserInfo(getUserInfo())
.setInitiateMultiPartUploadRequest(
multipartInfoInitiateRequest.toBuilder().setKeyArgs(resolvedArgs))
.setInitiateMultiPartUploadRequest(requestBuilder)
.build();
}

Expand Down Expand Up @@ -195,6 +199,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
replicationConfig)
.setObjectID(objectID)
.setUpdateID(transactionLogIndex)
.setSchemaVersion(
resolveMultipartSchemaVersion(multipartInfoInitiateRequest))
.build();

omKeyInfo = new OmKeyInfo.Builder()
Expand Down Expand Up @@ -284,6 +290,32 @@ protected void logResult(OzoneManager ozoneManager,
}
}

protected int resolveMultipartSchemaVersion(
MultipartInfoInitiateRequest multipartInfoInitiateRequest) {
if (!multipartInfoInitiateRequest.hasSchemaVersion()) {
return OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION;
}
return (int) multipartInfoInitiateRequest.getSchemaVersion();
}

@RequestFeatureValidator(
conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION,
processingPhase = RequestProcessingPhase.PRE_PROCESS,
requestType = Type.InitiateMultiPartUpload
)
public static OMRequest setSchemaVersionOnInitiateMultipartUpload(
OMRequest req, ValidationContext ctx) {
MultipartInfoInitiateRequest initiateRequest =
req.getInitiateMultiPartUploadRequest();

// Keep newly initiated MPUs on legacy schema until split parts-table
// write/read paths are fully implemented.
return req.toBuilder()
.setInitiateMultiPartUploadRequest(initiateRequest.toBuilder()
.setSchemaVersion(OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION))
.build();
}

@RequestFeatureValidator(
conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION,
processingPhase = RequestProcessingPhase.PRE_PROCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(transactionLogIndex)
.setParentID(pathInfoFSO.getLastKnownParentId())
.setSchemaVersion(
resolveMultipartSchemaVersion(multipartInfoInitiateRequest))
.build();

omKeyInfo = new OmKeyInfo.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
openKey + "entry is not found in the openKey table",
KEY_NOT_FOUND);
}

if (multipartKeyInfo == null) {
// This can occur when user started uploading part by the time commit
// of that part happens, in between the user might have requested
// abort multipart upload. If we just throw exception, then the data
// will not be garbage collected, so move this part to delete table
// and throw error.
throw new OMException("No such Multipart upload is with specified " +
"uploadId " + uploadID,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

// Add/Update user defined metadata.
// Set the UpdateID to current transactionLogIndex
omKeyInfo = omKeyInfo.toBuilder()
Expand All @@ -181,18 +193,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
int partNumber = keyArgs.getMultipartNumber();
partName = getPartName(ozoneKey, uploadID, partNumber);

if (multipartKeyInfo == null) {
// This can occur when user started uploading part by the time commit
// of that part happens, in between the user might have requested
// abort multipart upload. If we just throw exception, then the data
// will not be garbage collected, so move this part to delete table
// and throw error
// Move this part to delete table.
throw new OMException("No such Multipart upload is with specified " +
"uploadId " + uploadID,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);

// Build this multipart upload part info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// Conditional write validation (If-None-Match / If-Match).
// BUCKET_LOCK is held, so validation and commit are atomic.
// Only 412 PreconditionFailed is possible; 409 Conflict cannot occur.
OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
OmKeyInfo existingKeyInfo =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
validateAtomicRewrite(existingKeyInfo, keyArgs);
validateIfMatchETag(keyArgs, existingKeyInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public enum OMLayoutFeature implements LayoutFeature {
QUOTA(6, "Ozone quota re-calculate"),
HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"),
DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"),
SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot");
SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot"),

MPU_PARTS_TABLE_SPLIT(10, "Split multipart table into separate table for parts and key");

/////////////////////////////// /////////////////////////////

Expand Down
Loading