diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java index 88036d3e66a6..db29582c9c5f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java @@ -87,7 +87,9 @@ public final class OmMultipartKeyInfo extends WithObjectID implements CopyObject // This stores the schema version of the multipart key. // 0 - Legacy Schema -> Uses the same table to store the multipart part info // 1 - New Schema -> Uses a separate table to store the multipart part info - private final byte schemaVersion; + public static final int LEGACY_SCHEMA_VERSION = 0; + public static final int SPLIT_PARTS_SCHEMA_VERSION = 1; + private final int schemaVersion; public static Codec getCodec() { return CODEC; @@ -258,10 +260,6 @@ public PartKeyInfoMap getPartKeyInfoMap() { } public void addPartKeyInfo(PartKeyInfo partKeyInfo) { - if (schemaVersion == 1) { - throw new IllegalStateException( - "PartKeyInfoMap is not supported for schemaVersion 1"); - } this.partKeyInfoMap = PartKeyInfoMap.put(partKeyInfo, partKeyInfoMap); } @@ -273,7 +271,7 @@ public ReplicationConfig getReplicationConfig() { return replicationConfig; } - public byte getSchemaVersion() { + public int getSchemaVersion() { return schemaVersion; } @@ -295,7 +293,7 @@ public static class Builder extends WithObjectID.Builder { private final AclListBuilder acls; private final TreeMap partKeyInfoList; private long parentID; - private byte schemaVersion; + private int schemaVersion; public Builder() { this.acls = AclListBuilder.empty(); @@ -314,10 +312,8 @@ public Builder(OmMultipartKeyInfo multipartKeyInfo) { this.acls = AclListBuilder.of(multipartKeyInfo.acls); this.partKeyInfoList = new TreeMap<>(); - if (multipartKeyInfo.getSchemaVersion() == 0) { - for (PartKeyInfo partKeyInfo : multipartKeyInfo.partKeyInfoMap) { - this.partKeyInfoList.put(partKeyInfo.getPartNumber(), partKeyInfo); - } + for (PartKeyInfo partKeyInfo : multipartKeyInfo.partKeyInfoMap) { + this.partKeyInfoList.put(partKeyInfo.getPartNumber(), partKeyInfo); } this.parentID = multipartKeyInfo.parentID; @@ -408,8 +404,8 @@ public Builder setParentID(long parentObjId) { return this; } - public Builder setSchemaVersion(byte schemaVersion) { - this.schemaVersion = schemaVersion; + public Builder setSchemaVersion(int schemaVersion) { + this.schemaVersion = validateAndConvertSchemaVersion(schemaVersion); return this; } @@ -427,10 +423,11 @@ protected OmMultipartKeyInfo buildObject() { public static Builder builderFromProto( MultipartKeyInfo multipartKeyInfo) { final SortedMap list = new TreeMap<>(); - if (!multipartKeyInfo.hasSchemaVersion() || multipartKeyInfo.getSchemaVersion() == 0) { - multipartKeyInfo.getPartKeyInfoListList().forEach(partKeyInfo -> - list.put(partKeyInfo.getPartNumber(), partKeyInfo)); - } + // Keep reading the embedded part list for both schema versions so MPU + // commit/complete/list flows stay backward compatible until the parts + // split-table write/read path is fully wired in all requests. + multipartKeyInfo.getPartKeyInfoListList().forEach(partKeyInfo -> + list.put(partKeyInfo.getPartNumber(), partKeyInfo)); final ReplicationConfig replicationConfig = ReplicationConfig.fromProto( multipartKeyInfo.getType(), @@ -455,7 +452,8 @@ public static Builder builderFromProto( .setObjectID(multipartKeyInfo.getObjectID()) .setUpdateID(multipartKeyInfo.getUpdateID()) .setParentID(multipartKeyInfo.getParentID()) - .setSchemaVersion((byte) multipartKeyInfo.getSchemaVersion()); + .setSchemaVersion(validateAndConvertSchemaVersion( + multipartKeyInfo.getSchemaVersion())); } /** @@ -473,11 +471,6 @@ public static OmMultipartKeyInfo getFromProto( * @return MultipartKeyInfo */ public MultipartKeyInfo getProto() { - if (schemaVersion == 1 && partKeyInfoMap != null && partKeyInfoMap.size() > 0) { - throw new IllegalStateException( - "PartKeyInfoMap must be empty for schemaVersion 1"); - } - MultipartKeyInfo.Builder builder = MultipartKeyInfo.newBuilder() .setUploadID(uploadID) .setCreationTime(creationTime) @@ -507,12 +500,21 @@ public MultipartKeyInfo getProto() { } builder.addAllAcls(OzoneAclUtil.toProtobuf(acls)); - if (schemaVersion == 0) { - builder.addAllPartKeyInfoList(partKeyInfoMap); - } + // Keep serializing the embedded part list for both schema versions for + // compatibility with existing MPU request flows. + builder.addAllPartKeyInfoList(partKeyInfoMap); return builder.build(); } + private static int validateAndConvertSchemaVersion(int schemaVersion) { + if (schemaVersion != LEGACY_SCHEMA_VERSION + && schemaVersion != SPLIT_PARTS_SCHEMA_VERSION) { + throw new IllegalArgumentException("Unsupported schemaVersion: " + + schemaVersion + ". Expected one of [0, 1]."); + } + return schemaVersion; + } + @Override public String getObjectInfo() { return getProto().toString(); diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java index 3a5658dd5bde..661ccaa1077d 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java @@ -118,17 +118,17 @@ public void distinctListOfParts() { } @Test - public void addPartKeyInfoRejectsSchemaVersionOne() { + public void addPartKeyInfoSupportsSchemaVersionOne() { OmMultipartKeyInfo subject = createSubject() - .setSchemaVersion((byte) 1) + .setSchemaVersion(1) .build(); - assertThrows(IllegalStateException.class, - () -> subject.addPartKeyInfo(createPart(createKeyInfo()).build())); + subject.addPartKeyInfo(createPart(createKeyInfo()).build()); + assertEquals(1, subject.getPartKeyInfoMap().size()); } @Test - public void getProtoRejectsLegacyPartListForSchemaVersionOne() { + public void getProtoSupportsPartListForSchemaVersionOne() { PartKeyInfo part = createPart(createKeyInfo()).build(); TreeMap legacyMap = new TreeMap<>(); legacyMap.put(part.getPartNumber(), part); @@ -136,13 +136,32 @@ public void getProtoRejectsLegacyPartListForSchemaVersionOne() { OmMultipartKeyInfo subject = new OmMultipartKeyInfo.Builder() .setUploadID(UUID.randomUUID().toString()) .setCreationTime(Time.now()) - .setSchemaVersion((byte) 1) + .setSchemaVersion(1) .setReplicationConfig(StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE)) .setPartKeyInfoList(legacyMap) .build(); - assertThrows(IllegalStateException.class, subject::getProto); + OzoneManagerProtocolProtos.MultipartKeyInfo proto = subject.getProto(); + OmMultipartKeyInfo fromProto = OmMultipartKeyInfo.getFromProto(proto); + assertEquals(1, fromProto.getSchemaVersion()); + assertEquals(1, fromProto.getPartKeyInfoMap().size()); + } + + @Test + public void builderFromProtoRejectsUnsupportedSchemaVersion() { + OmMultipartKeyInfo subject = createSubject() + .setReplicationConfig(StandaloneReplicationConfig.getInstance( + HddsProtos.ReplicationFactor.ONE)) + .build(); + + OzoneManagerProtocolProtos.MultipartKeyInfo invalidProto = subject.getProto() + .toBuilder() + .setSchemaVersion(256) + .build(); + + assertThrows(IllegalArgumentException.class, + () -> OmMultipartKeyInfo.getFromProto(invalidProto)); } private static OmMultipartKeyInfo.Builder createSubject() { diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 22c71cc29852..4e30ed9f530c 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1715,6 +1715,7 @@ message ServiceInfo { message MultipartInfoInitiateRequest { required KeyArgs keyArgs = 1; + optional uint32 schemaVersion = 2; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java index 22f470c80c7b..d2128b2b39b5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java @@ -100,10 +100,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs.build(), ozoneManager, ACLType.CREATE); + int schemaVersion = resolveMultipartSchemaVersion(multipartInfoInitiateRequest); + MultipartInfoInitiateRequest.Builder requestBuilder = + multipartInfoInitiateRequest.toBuilder() + .setKeyArgs(resolvedArgs) + .setSchemaVersion(schemaVersion); return getOmRequest().toBuilder() .setUserInfo(getUserInfo()) - .setInitiateMultiPartUploadRequest( - multipartInfoInitiateRequest.toBuilder().setKeyArgs(resolvedArgs)) + .setInitiateMultiPartUploadRequest(requestBuilder) .build(); } @@ -195,6 +199,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut replicationConfig) .setObjectID(objectID) .setUpdateID(transactionLogIndex) + .setSchemaVersion( + resolveMultipartSchemaVersion(multipartInfoInitiateRequest)) .build(); omKeyInfo = new OmKeyInfo.Builder() @@ -284,6 +290,32 @@ protected void logResult(OzoneManager ozoneManager, } } + protected int resolveMultipartSchemaVersion( + MultipartInfoInitiateRequest multipartInfoInitiateRequest) { + if (!multipartInfoInitiateRequest.hasSchemaVersion()) { + return OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION; + } + return (int) multipartInfoInitiateRequest.getSchemaVersion(); + } + + @RequestFeatureValidator( + conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, + processingPhase = RequestProcessingPhase.PRE_PROCESS, + requestType = Type.InitiateMultiPartUpload + ) + public static OMRequest setSchemaVersionOnInitiateMultipartUpload( + OMRequest req, ValidationContext ctx) { + MultipartInfoInitiateRequest initiateRequest = + req.getInitiateMultiPartUploadRequest(); + + // Keep newly initiated MPUs on legacy schema until split parts-table + // write/read paths are fully implemented. + return req.toBuilder() + .setInitiateMultiPartUploadRequest(initiateRequest.toBuilder() + .setSchemaVersion(OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION)) + .build(); + } + @RequestFeatureValidator( conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, processingPhase = RequestProcessingPhase.PRE_PROCESS, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java index 919491d70499..eb4d21d608b2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java @@ -168,6 +168,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut .setObjectID(pathInfoFSO.getLeafNodeObjectId()) .setUpdateID(transactionLogIndex) .setParentID(pathInfoFSO.getLastKnownParentId()) + .setSchemaVersion( + resolveMultipartSchemaVersion(multipartInfoInitiateRequest)) .build(); omKeyInfo = new OmKeyInfo.Builder() diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index ac123ff680ac..3bd3abbe8b81 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -161,6 +161,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut openKey + "entry is not found in the openKey table", KEY_NOT_FOUND); } + + if (multipartKeyInfo == null) { + // This can occur when user started uploading part by the time commit + // of that part happens, in between the user might have requested + // abort multipart upload. If we just throw exception, then the data + // will not be garbage collected, so move this part to delete table + // and throw error. + throw new OMException("No such Multipart upload is with specified " + + "uploadId " + uploadID, + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } + // Add/Update user defined metadata. // Set the UpdateID to current transactionLogIndex omKeyInfo = omKeyInfo.toBuilder() @@ -181,18 +193,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut int partNumber = keyArgs.getMultipartNumber(); partName = getPartName(ozoneKey, uploadID, partNumber); - if (multipartKeyInfo == null) { - // This can occur when user started uploading part by the time commit - // of that part happens, in between the user might have requested - // abort multipart upload. If we just throw exception, then the data - // will not be garbage collected, so move this part to delete table - // and throw error - // Move this part to delete table. - throw new OMException("No such Multipart upload is with specified " + - "uploadId " + uploadID, - OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); - } - oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber); // Build this multipart upload part info. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 179eb87aabae..a692af65db38 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -272,7 +272,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut // Conditional write validation (If-None-Match / If-Match). // BUCKET_LOCK is held, so validation and commit are atomic. // Only 412 PreconditionFailed is possible; 409 Conflict cannot occur. - OmKeyInfo existingKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); + OmKeyInfo existingKeyInfo = + omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); validateAtomicRewrite(existingKeyInfo, keyArgs); validateIfMatchETag(keyArgs, existingKeyInfo); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java index ef99b453b7f0..c693a719e138 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java @@ -44,7 +44,9 @@ public enum OMLayoutFeature implements LayoutFeature { QUOTA(6, "Ozone quota re-calculate"), HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"), DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"), - SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot"); + SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot"), + + MPU_PARTS_TABLE_SPLIT(10, "Split multipart table into separate table for parts and key"); /////////////////////////////// ///////////////////////////// diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java index 742d17a87b06..58435f50230e 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.HashMap; @@ -32,8 +33,10 @@ import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.junit.jupiter.api.Test; @@ -113,6 +116,67 @@ public void testValidateAndUpdateCache() throws Exception { } + @Test + public void testValidateAndUpdateCacheSetsSchemaVersionZeroBeforeFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest request = + getS3InitiateMultipartUploadReq(modifiedRequest); + + OMClientResponse response = + request.validateAndUpdateCache(ozoneManager, 100L); + assertEquals(OzoneManagerProtocolProtos.Status.OK, + response.getOMResponse().getStatus()); + + String multipartKey = getMultipartKey(volumeName, bucketName, keyName, + modifiedRequest.getInitiateMultiPartUploadRequest() + .getKeyArgs().getMultipartUploadID()); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(0, multipartKeyInfo.getSchemaVersion()); + } + + @Test + public void testValidateAndUpdateCacheKeepsSchemaVersionZeroAfterFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + when(ozoneManager.getVersionManager().getMetadataLayoutVersion()) + .thenReturn(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT.layoutVersion()); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest request = + getS3InitiateMultipartUploadReq(modifiedRequest); + + OMClientResponse response = + request.validateAndUpdateCache(ozoneManager, 100L); + assertEquals(OzoneManagerProtocolProtos.Status.OK, + response.getOMResponse().getStatus()); + + String multipartKey = getMultipartKey(volumeName, bucketName, keyName, + modifiedRequest.getInitiateMultiPartUploadRequest() + .getKeyArgs().getMultipartUploadID()); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(0, multipartKeyInfo.getSchemaVersion()); + } + @Test public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception { String volumeName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java index 15ed924cc408..3ee32e0f4b55 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java @@ -34,6 +34,8 @@ import java.util.Map; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.audit.AuditLogger; import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.om.IOmMetadataReader; @@ -47,8 +49,10 @@ import org.apache.hadoop.ozone.om.ResolvedBucket; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; @@ -302,6 +306,46 @@ protected OMRequest doPreExecuteCompleteMPU( } + /** + * Initiate an MPU and optionally rewrite the stored multipart metadata to a + * specific schema version. + * + *

The schema version rewrite lets tests emulate post-finalization MPU + * entries without needing the rest of the upgrade pipeline.

+ */ + protected String initiateMultipartUploadWithSchemaVersion( + String volumeName, String bucketName, String keyName, + int schemaVersion) throws Exception { + OMRequest initiateMPURequest = + doPreExecuteInitiateMPU(volumeName, bucketName, keyName); + + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + + OMClientResponse omClientResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); + + String multipartUploadID = omClientResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + if (schemaVersion != 0) { + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + + omMetadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(multipartKey), + CacheValue.get(2L, multipartKeyInfo.toBuilder() + .setSchemaVersion(schemaVersion) + .build())); + } + + return multipartUploadID; + } + /** * Perform preExecute of Initiate Multipart upload request for given * volume, bucket and key name. diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java index d9f84523204e..92a268b27ea6 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java @@ -20,13 +20,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.UUID; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.junit.jupiter.api.Test; @@ -95,6 +98,91 @@ public void testValidateAndUpdateCache() throws Exception { } + @Test + public void testValidateAndUpdateCacheUsesSchemaVersionOneBeforeFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // The base test fixture is pre-finalized by default. + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + createParentPath(volumeName, bucketName); + + String multipartUploadID = + initiateMultipartUploadWithSchemaVersion(volumeName, bucketName, + keyName, 1); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(1, multipartKeyInfo.getSchemaVersion()); + + OMRequest abortMPURequest = + doPreExecuteAbortMPU(volumeName, bucketName, keyName, + multipartUploadID); + + S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest = + getS3MultipartUploadAbortReq(abortMPURequest); + + OMClientResponse omClientResponse = + s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L); + + assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + } + + @Test + public void testValidateAndUpdateCacheAllowsSchemaVersionZeroAfterFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // Tests must explicitly bump metadata layout version to simulate finalized OM. + when(ozoneManager.getVersionManager().getMetadataLayoutVersion()) + .thenReturn(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT.layoutVersion()); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + createParentPath(volumeName, bucketName); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + OMClientResponse initiateResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); + String multipartUploadID = initiateResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(0, multipartKeyInfo.getSchemaVersion()); + + OMRequest abortMPURequest = + doPreExecuteAbortMPU(volumeName, bucketName, keyName, + multipartUploadID); + + S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest = + getS3MultipartUploadAbortReq(abortMPURequest); + + OMClientResponse omClientResponse = + s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L); + + assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + } + @Test public void testValidateAndUpdateCacheMultipartNotFound() throws Exception { String volumeName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java index b5d78662507a..862b6fd40606 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -33,7 +34,6 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; @@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCommitPartResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -129,6 +130,97 @@ public void testValidateAndUpdateCacheSuccess() throws Exception { .get(partKey)); } + @Test + public void testValidateAndUpdateCacheUsesSchemaVersionOneBeforeFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // The base test fixture is pre-finalized by default. + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + createParentPath(volumeName, bucketName); + + String multipartUploadID = + initiateMultipartUploadWithSchemaVersion(volumeName, bucketName, + keyName, 1); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(1, multipartKeyInfo.getSchemaVersion()); + + long clientID = Time.now(); + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + getS3MultipartUploadCommitReq(commitMultipartRequest); + + addKeyToOpenKeyTable(volumeName, bucketName, keyName, clientID); + + OMClientResponse omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + } + + @Test + public void testValidateAndUpdateCacheAllowsSchemaVersionZeroAfterFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // Tests must explicitly bump metadata layout version to simulate finalized OM. + when(ozoneManager.getVersionManager().getMetadataLayoutVersion()) + .thenReturn(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT.layoutVersion()); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + createParentPath(volumeName, bucketName); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + OMClientResponse initiateResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); + String multipartUploadID = initiateResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + String multipartKey = omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(0, multipartKeyInfo.getSchemaVersion()); + + long clientID = Time.now(); + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + getS3MultipartUploadCommitReq(commitMultipartRequest); + + addKeyToOpenKeyTable(volumeName, bucketName, keyName, clientID); + + OMClientResponse omClientResponse = + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L); + + assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + } + @Test public void testValidateAndUpdateCacheMultipartNotFound() throws Exception { String volumeName = UUID.randomUUID().toString(); @@ -162,7 +254,6 @@ public void testValidateAndUpdateCacheMultipartNotFound() throws Exception { bucketName, keyName, multipartUploadID); assertNull(omMetadataManager.getMultipartInfoTable().get(multipartKey)); - } @Test @@ -174,9 +265,19 @@ public void testValidateAndUpdateCacheKeyNotFound() throws Exception { OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, omMetadataManager, getBucketLayout()); + createParentPath(volumeName, bucketName); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + OMClientResponse initiateResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); long clientID = Time.now(); - String multipartUploadID = UUID.randomUUID().toString(); + String multipartUploadID = initiateResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, bucketName, keyName, clientID, multipartUploadID, 1); @@ -191,13 +292,8 @@ public void testValidateAndUpdateCacheKeyNotFound() throws Exception { OMClientResponse omClientResponse = s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, 2L); - if (getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED) { - assertSame(omClientResponse.getOMResponse().getStatus(), - OzoneManagerProtocolProtos.Status.DIRECTORY_NOT_FOUND); - } else { - assertSame(omClientResponse.getOMResponse().getStatus(), - OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND); - } + assertSame(omClientResponse.getOMResponse().getStatus(), + OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java index dbf82cc18ea0..c4919147e982 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -37,9 +38,11 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part; @@ -206,6 +209,80 @@ private String checkValidateAndUpdateCacheSuccess(String volumeName, return multipartUploadID; } + @Test + public void testValidateAndUpdateCacheUsesSchemaVersionOneBeforeFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // The base test fixture is pre-finalized by default. + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + String multipartUploadID = + initiateMultipartUploadWithSchemaVersion(volumeName, bucketName, + keyName, 1); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, multipartUploadID, new ArrayList<>()); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + getS3MultipartUploadCompleteReq(completeMultipartRequest); + + OMClientResponse omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L); + + assertEquals(OzoneManagerProtocolProtos.Status.INVALID_REQUEST, + omClientResponse.getOMResponse().getStatus()); + } + + @Test + public void testValidateAndUpdateCacheAllowsSchemaVersionZeroAfterFinalization() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + // Tests must explicitly bump metadata layout version to simulate finalized OM. + when(ozoneManager.getVersionManager().getMetadataLayoutVersion()) + .thenReturn(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT.layoutVersion()); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, getBucketLayout()); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + OMClientResponse initiateResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, + 1L); + String multipartUploadID = initiateResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + String multipartKey = getMultipartKey(volumeName, bucketName, keyName, + multipartUploadID); + OmMultipartKeyInfo multipartKeyInfo = omMetadataManager + .getMultipartInfoTable().get(multipartKey); + assertNotNull(multipartKeyInfo); + assertEquals(0, multipartKeyInfo.getSchemaVersion()); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, multipartUploadID, new ArrayList<>()); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + getS3MultipartUploadCompleteReq(completeMultipartRequest); + + OMClientResponse omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, + 3L); + + assertEquals(OzoneManagerProtocolProtos.Status.INVALID_REQUEST, + omClientResponse.getOMResponse().getStatus()); + } + protected void addVolumeAndBucket(String volumeName, String bucketName) throws Exception { OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,