diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java index 41cd45956547..471f15789f6d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java @@ -128,6 +128,10 @@ protected synchronized boolean shouldFailover(Exception ex) { return super.shouldFailover(ex); } + public synchronized boolean shouldFailoverForFollowerRead(Exception ex) { + return shouldFailover(ex); + } + @Override public synchronized void close() throws IOException { } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java index e794107cd54d..21baa053e44d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java @@ -51,16 +51,23 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc_.RemoteException; +import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase; +import org.apache.hadoop.ozone.om.helpers.ReadConsistency; import org.apache.hadoop.ozone.om.protocolPB.grpc.ClientAddressClientInterceptor; import org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ratis.protocol.exceptions.ReadException; +import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +99,10 @@ public class GrpcOmTransport implements OmTransport { private RetryPolicy retryPolicy; private final GrpcOMFailoverProxyProvider omFailoverProxyProvider; + private volatile boolean useFollowerRead; + private final ReadConsistencyHint followerReadConsistency; + private final ReadConsistencyHint leaderReadConsistency; + private int currentFollowerReadIndex = -1; public static void setCaCerts(List x509Certificates) { caCerts = x509Certificates; @@ -117,6 +128,27 @@ public GrpcOmTransport(ConfigurationSource conf, omServiceId, OzoneManagerProtocolPB.class); + this.useFollowerRead = conf.getBoolean( + OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, + OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT); + String defaultFollowerReadConsistencyStr = conf.get( + OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY, + OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT + ); + ReadConsistency defaultFollowerReadConsistency = + ReadConsistency.valueOf(defaultFollowerReadConsistencyStr); + String defaultLeaderReadConsistencyStr = conf.get( + OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY, + OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT); + ReadConsistency defaultLeaderReadConsistency = + ReadConsistency.valueOf(defaultLeaderReadConsistencyStr); + Preconditions.assertTrue(defaultFollowerReadConsistency.allowFollowerRead(), + "Invalid follower read consistency " + defaultFollowerReadConsistency); + Preconditions.assertTrue(!defaultLeaderReadConsistency.allowFollowerRead(), + "Invalid leader read consistency " + defaultLeaderReadConsistency); + this.followerReadConsistency = defaultFollowerReadConsistency.getHint(); + this.leaderReadConsistency = defaultLeaderReadConsistency.getHint(); + start(); } @@ -174,7 +206,63 @@ public void start() throws IOException { @Override public OMResponse submitRequest(OMRequest payload) throws IOException { - AtomicReference resp = new AtomicReference<>(); + if (useFollowerRead && OmUtils.shouldSendToFollower(payload)) { + return submitRequestWithFollowerRead(payload); + } + return submitRequestToLeader(addReadConsistencyHint(payload, + leaderReadConsistency)); + } + + private OMResponse submitRequestWithFollowerRead(OMRequest payload) + throws IOException { + OMRequest followerPayload = addReadConsistencyHint(payload, + followerReadConsistency); + int failedCount = 0; + for (int i = 0; useFollowerRead && + i < omFailoverProxyProvider.getOMProxyMap().getNodeIds().size(); i++) { + String nodeId = getCurrentFollowerReadNodeId(); + String followerHost = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); + try { + OMResponse response = submitRequestToHost(followerPayload, followerHost); + LOG.debug("Invocation with cmdType {} using follower read host {} was successful", + followerPayload.getCmdType(), followerHost); + return response; + } catch (StatusRuntimeException e) { + LOG.debug("Invocation with cmdType {} using follower read host {} failed", + followerPayload.getCmdType(), followerHost, e); + Exception unwrapped = unwrapException(new Exception(e)); + if (OMFailoverProxyProviderBase.getNotLeaderException(unwrapped) != null) { + LOG.debug("Encountered OMNotLeaderException from {}. Disable OM follower read and retry OM leader directly.", + followerHost); + useFollowerRead = false; + break; + } + if (OMFailoverProxyProviderBase.getLeaderNotReadyException(unwrapped) != null) { + break; + } + ReadIndexException readIndexException = + OMFailoverProxyProviderBase.getReadIndexException(unwrapped); + ReadException readException = + OMFailoverProxyProviderBase.getReadException(unwrapped); + if (readIndexException != null || readException != null || + omFailoverProxyProvider.shouldFailoverForFollowerRead(unwrapped)) { + failedCount++; + changeFollowerReadProxy(nodeId); + } else { + throw e; + } + } + } + if (failedCount > 0) { + LOG.warn("{} nodes have failed for read request with cmdType {}. Falling back to leader.", + failedCount, payload.getCmdType()); + } + return submitRequestToLeader(addReadConsistencyHint(payload, + leaderReadConsistency)); + } + + private OMResponse submitRequestToLeader(OMRequest payload) + throws IOException { int requestFailoverCount = 0; boolean tryOtherHost = true; int expectedFailoverCount = 0; @@ -183,14 +271,7 @@ public OMResponse submitRequest(OMRequest payload) throws IOException { tryOtherHost = false; expectedFailoverCount = globalFailoverCount.get(); try { - InetAddress inetAddress = InetAddress.getLocalHost(); - Context.current() - .withValue(GrpcClientConstants.CLIENT_IP_ADDRESS_CTX_KEY, - inetAddress.getHostAddress()) - .withValue(GrpcClientConstants.CLIENT_HOSTNAME_CTX_KEY, - inetAddress.getHostName()) - .run(() -> resp.set(clients.get(host.get()) - .submitRequest(payload))); + return submitRequestToHost(payload, host.get()); } catch (StatusRuntimeException e) { LOG.error("Failed to submit request", e); if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { @@ -208,9 +289,49 @@ public OMResponse submitRequest(OMRequest payload) throws IOException { } } } + throw new OMException(resultCode); + } + + private OMResponse submitRequestToHost(OMRequest payload, String targetHost) + throws IOException { + AtomicReference resp = new AtomicReference<>(); + InetAddress inetAddress = InetAddress.getLocalHost(); + Context.current() + .withValue(GrpcClientConstants.CLIENT_IP_ADDRESS_CTX_KEY, + inetAddress.getHostAddress()) + .withValue(GrpcClientConstants.CLIENT_HOSTNAME_CTX_KEY, + inetAddress.getHostName()) + .run(() -> resp.set(clients.get(targetHost) + .submitRequest(payload))); return resp.get(); } + private OMRequest addReadConsistencyHint(OMRequest payload, + ReadConsistencyHint readConsistencyHint) { + if (!payload.hasReadConsistencyHint() && readConsistencyHint != null) { + return payload.toBuilder() + .setReadConsistencyHint(readConsistencyHint) + .build(); + } + return payload; + } + + private synchronized String getCurrentFollowerReadNodeId() { + if (currentFollowerReadIndex < 0) { + currentFollowerReadIndex = 0; + } + return new ArrayList<>(omFailoverProxyProvider.getOMProxyMap().getNodeIds()) + .get(currentFollowerReadIndex); + } + + private synchronized void changeFollowerReadProxy(String currentNodeId) { + String currentFollowerReadNodeId = getCurrentFollowerReadNodeId(); + if (currentFollowerReadNodeId.equals(currentNodeId)) { + currentFollowerReadIndex = (currentFollowerReadIndex + 1) % + omFailoverProxyProvider.getOMProxyMap().getNodeIds().size(); + } + } + private Exception unwrapException(Exception ex) { Exception grpcException = null; try { @@ -230,11 +351,10 @@ private Exception unwrapException(Exception ex) { grpcException = cn.newInstance(status.getDescription()); IOException remote = null; try { - String cause = status.getDescription(); - int colonIndex = cause.indexOf(':'); - cause = cause.substring(colonIndex + 2); - remote = new RemoteException(cause.substring(0, colonIndex), - cause.substring(colonIndex + 1)); + String description = status.getDescription(); + int colonIndex = description.indexOf(':'); + remote = new RemoteException(description.substring(0, colonIndex), + description.substring(colonIndex + 2)); grpcException.initCause(remote); } catch (Exception e) { LOG.error("cannot get cause for remote exception"); @@ -371,4 +491,32 @@ public void startClient(ManagedChannel testChannel) throws IOException { LOG.info("{}: started", CLIENT_NAME); } + @VisibleForTesting + public void startClient(String nodeId, ManagedChannel testChannel) throws IOException { + String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); + clients.put(hostaddr, + OzoneManagerServiceGrpc + .newBlockingStub(testChannel)); + LOG.info("{}: started test client for {}", CLIENT_NAME, nodeId); + } + + @VisibleForTesting + public synchronized void changeFollowerReadInitialProxy(String nodeId) { + List nodeIds = new ArrayList<>( + omFailoverProxyProvider.getOMProxyMap().getNodeIds()); + for (int i = 0; i < nodeIds.size(); i++) { + if (nodeIds.get(i).equals(nodeId)) { + currentFollowerReadIndex = i; + return; + } + } + } + + @VisibleForTesting + public void changeLeaderProxyForTest(String nodeId) throws IOException { + omFailoverProxyProvider.setNextOmProxy(nodeId); + omFailoverProxyProvider.performFailover(null); + host.set(omFailoverProxyProvider.getGrpcProxyAddress(nodeId)); + } + } diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java index 176d9b6d03bd..3d2a5fedda34 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java @@ -18,8 +18,12 @@ package org.apache.hadoop.ozone.om.protocolPB; import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_PORT_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.AdditionalAnswers.delegatesTo; @@ -29,13 +33,19 @@ import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.ha.ConfUtils; import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; @@ -272,6 +282,215 @@ public void testGrpcFailoverExceedMaxMesgLen() throws Exception { assertThrows(Exception.class, () -> client.submitRequest(omRequest)); } + @Test + public void testFollowerReadDoesNotFailoverFromKnownLeader() throws Exception { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + configureHaOmService("om0", "om1"); + + AtomicInteger leaderRequestCount = new AtomicInteger(); + AtomicInteger followerRequestCount = new AtomicInteger(); + AtomicReference leaderRequest = new AtomicReference<>(); + + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient("om0", createNodeChannel("om0", + leaderRequestCount, leaderRequest)); + client.startClient("om1", createNodeChannel("om1", + followerRequestCount, new AtomicReference<>())); + client.changeLeaderProxyForTest("om0"); + client.changeFollowerReadInitialProxy("om0"); + + OMRequest request = OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .build(); + + client.submitRequest(request); + + assertEquals(1, leaderRequestCount.get()); + assertEquals(0, followerRequestCount.get()); + assertEquals(ReadConsistencyProto.LINEARIZABLE_ALLOW_FOLLOWER, + leaderRequest.get().getReadConsistencyHint().getReadConsistency()); + } + + @Test + public void testFollowerReadDoesNotRouteWriteRequestToFollower() throws Exception { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + configureHaOmService("om0", "om1"); + + AtomicInteger leaderRequestCount = new AtomicInteger(); + AtomicInteger followerRequestCount = new AtomicInteger(); + AtomicReference leaderRequest = new AtomicReference<>(); + + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient("om0", createNodeChannel("om0", + leaderRequestCount, leaderRequest)); + client.startClient("om1", createNodeChannel("om1", + followerRequestCount, new AtomicReference<>())); + client.changeLeaderProxyForTest("om0"); + client.changeFollowerReadInitialProxy("om1"); + + client.submitRequest(OMRequest.newBuilder() + .setCmdType(Type.CreateVolume) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .build()); + + assertEquals(1, leaderRequestCount.get()); + assertEquals(0, followerRequestCount.get()); + assertEquals(ReadConsistencyProto.DEFAULT, + leaderRequest.get().getReadConsistencyHint().getReadConsistency()); + } + + @Test + public void testFollowerReadKeepsExistingConsistencyHint() throws Exception { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + configureHaOmService("om0", "om1"); + + AtomicInteger followerRequestCount = new AtomicInteger(); + AtomicReference followerRequest = new AtomicReference<>(); + + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient("om0", createNodeChannel("om0", + new AtomicInteger(), new AtomicReference<>())); + client.startClient("om1", createNodeChannel("om1", + followerRequestCount, followerRequest)); + client.changeLeaderProxyForTest("om0"); + client.changeFollowerReadInitialProxy("om1"); + + client.submitRequest(OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .setReadConsistencyHint(ReadConsistencyHint.newBuilder() + .setReadConsistency(ReadConsistencyProto.LOCAL_LEASE) + .build()) + .build()); + + assertEquals(1, followerRequestCount.get()); + assertEquals(ReadConsistencyProto.LOCAL_LEASE, + followerRequest.get().getReadConsistencyHint().getReadConsistency()); + } + + @Test + public void testFollowerReadFallsBackToLeaderOnNotLeaderException() throws Exception { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + configureHaOmService("om0", "om1"); + + AtomicInteger leaderRequestCount = new AtomicInteger(); + AtomicInteger followerRequestCount = new AtomicInteger(); + AtomicReference leaderRequest = new AtomicReference<>(); + + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient("om0", createNodeChannel("om0", + leaderRequestCount, leaderRequest)); + client.startClient("om1", createNotLeaderNodeChannel(followerRequestCount)); + client.changeLeaderProxyForTest("om0"); + client.changeFollowerReadInitialProxy("om1"); + + client.submitRequest(OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .build()); + + assertEquals(1, followerRequestCount.get()); + assertEquals(1, leaderRequestCount.get()); + assertEquals(ReadConsistencyProto.DEFAULT, + leaderRequest.get().getReadConsistencyHint().getReadConsistency()); + } + + @Test + public void testFollowerReadRejectsInvalidFollowerReadConsistency() { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY, "DEFAULT"); + configureHaOmService("om0", "om1"); + + assertThrows(IllegalStateException.class, + () -> new GrpcOmTransport(conf, ugi, omServiceId)); + } + + @Test + public void testFollowerReadRejectsInvalidLeaderReadConsistency() { + conf.setBoolean(OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY, + "LINEARIZABLE_ALLOW_FOLLOWER"); + configureHaOmService("om0", "om1"); + + assertThrows(IllegalStateException.class, + () -> new GrpcOmTransport(conf, ugi, omServiceId)); + } + + private void configureHaOmService(String... nodeIds) { + omServiceId = "om-service-test"; + conf.set(OZONE_OM_SERVICE_IDS_KEY, omServiceId); + conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, omServiceId), + String.join(",", nodeIds)); + for (int i = 0; i < nodeIds.length; i++) { + conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, omServiceId, + nodeIds[i]), "localhost"); + conf.setInt(ConfUtils.addKeySuffixes(OZONE_OM_GRPC_PORT_KEY, omServiceId, + nodeIds[i]), 19880 + i); + } + } + + private ManagedChannel createNodeChannel(String nodeId, + AtomicInteger requestCount, AtomicReference lastRequest) + throws IOException { + String nodeServerName = InProcessServerBuilder.generateName(); + grpcCleanup.register(InProcessServerBuilder + .forName(nodeServerName) + .directExecutor() + .addService(new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() { + @Override + public void submitRequest(OMRequest request, + StreamObserver responseObserver) { + requestCount.incrementAndGet(); + lastRequest.set(request); + responseObserver.onNext(OMResponse.newBuilder() + .setSuccess(true) + .setStatus(org.apache.hadoop.ozone.protocol + .proto.OzoneManagerProtocolProtos.Status.OK) + .setLeaderOMNodeId(nodeId) + .setCmdType(request.getCmdType()) + .build()); + responseObserver.onCompleted(); + } + }) + .build() + .start()); + return grpcCleanup.register( + InProcessChannelBuilder.forName(nodeServerName).directExecutor().build()); + } + + private ManagedChannel createNotLeaderNodeChannel(AtomicInteger requestCount) + throws IOException { + String nodeServerName = InProcessServerBuilder.generateName(); + grpcCleanup.register(InProcessServerBuilder + .forName(nodeServerName) + .directExecutor() + .addService(new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() { + @Override + public void submitRequest(OMRequest request, + StreamObserver responseObserver) { + requestCount.incrementAndGet(); + try { + throw createNotLeaderException(); + } catch (Throwable e) { + IOException ex = new IOException(e.getCause()); + responseObserver.onError(io.grpc.Status + .INTERNAL + .withDescription(ex.getMessage()) + .asRuntimeException()); + } + } + }) + .build() + .start()); + return grpcCleanup.register( + InProcessChannelBuilder.forName(nodeServerName).directExecutor().build()); + } + private static OMRequest arbitraryOmRequest() { ServiceListRequest req = ServiceListRequest.newBuilder().build(); return OMRequest.newBuilder() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/AbstractOzoneManagerHATest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/AbstractOzoneManagerHATest.java index 8152607b39e9..65fc84f7a544 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/AbstractOzoneManagerHATest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/AbstractOzoneManagerHATest.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -158,6 +159,7 @@ protected static void initCluster(boolean followerReadEnabled) throws Exception // Enable the OM follower read. omHAConfig.setReadOption("LINEARIZABLE"); omHAConfig.setReadLeaderLeaseEnabled(true); + conf.setBoolean(OZONE_OM_S3_GPRC_SERVER_ENABLED, true); } conf.setFromObject(omHAConfig); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java index 1f66b38c309d..0d8c3cdf24d5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java @@ -20,6 +20,7 @@ import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider; +import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport; import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; @@ -45,6 +46,13 @@ static HadoopRpcOMFollowerReadFailoverProxyProvider getFollowerReadFailoverProxy return transport.getOmFollowerReadFailoverProxyProvider(); } + static GrpcOmTransport getGrpcOmTransport(ObjectStore store) { + OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient = + (OzoneManagerProtocolClientSideTranslatorPB) store.getClientProxy().getOzoneManagerClient(); + + return (GrpcOmTransport) ozoneManagerClient.getTransport(); + } + static String getCurrentOmProxyNodeId(ObjectStore store) { return getFailoverProxyProvider(store).getCurrentProxyOMNodeId(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java index 9262da093a4a..5dd6a6fecb6f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; @@ -40,6 +41,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; @@ -59,7 +61,11 @@ import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMProxyInfo; +import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport; +import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory; +import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransportFactory; import org.apache.hadoop.ozone.om.protocolPB.OmTransport; +import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -73,6 +79,8 @@ import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.ozone.test.tag.Flaky; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; /** * Ozone Manager HA follower read tests where all OMs are running throughout all tests. @@ -105,29 +113,63 @@ void testOMFollowerReadProxyProviderInitialization() { } } - @Test - void testFollowerReadTargetsFollower() throws Exception { - ObjectStore objectStore = getObjectStore(); - HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = - OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore); + private static Stream> followerReadTransportClasses() { + return Stream.>of( + Hadoop3OmTransportFactory.class, + GrpcOmTransportFactory.class); + } + @ParameterizedTest + @MethodSource("followerReadTransportClasses") + void testFollowerReadTargetsFollower(Class omTransportClass) throws Exception { + OzoneConfiguration clientConf = new OzoneConfiguration(getConf()); + clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true); + clientConf.set(OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY, "LOCAL_LEASE"); + clientConf.set(OZONE_OM_TRANSPORT_CLASS, omTransportClass.getName()); String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId(); - String followerOMNodeId = null; + OzoneManager followerOM = null; for (OzoneManager om : getCluster().getOzoneManagersList()) { if (!om.getOMNodeId().equals(leaderOMNodeId)) { - followerOMNodeId = om.getOMNodeId(); + followerOM = om; break; } } - assertNotNull(followerOMNodeId); + assertNotNull(followerOM); - followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId); - objectStore.getClientProxy().listVolumes(null, null, 10); + OzoneClient ozoneClient = null; + try { + ozoneClient = OzoneClientFactory.getRpcClient(getOmServiceId(), clientConf); + ObjectStore objectStore = ozoneClient.getObjectStore(); + changeFollowerReadInitialProxy(objectStore, omTransportClass, leaderOMNodeId, followerOM.getOMNodeId()); + long previousLocalLeaseSuccess = followerOM.getMetrics().getNumFollowerReadLocalLeaseSuccess(); + + objectStore.listVolumes(""); - OMProxyInfo lastProxy = - (OMProxyInfo) followerReadFailoverProxyProvider.getLastProxy(); - assertNotNull(lastProxy); - assertEquals(followerOMNodeId, lastProxy.getNodeId()); + long currentLocalLeaseSuccess = followerOM.getMetrics().getNumFollowerReadLocalLeaseSuccess(); + assertThat(currentLocalLeaseSuccess).isGreaterThan(previousLocalLeaseSuccess); + } finally { + IOUtils.closeQuietly(ozoneClient); + } + } + + private void changeFollowerReadInitialProxy(ObjectStore objectStore, + Class omTransportClass, String leaderOMNodeId, String followerOMNodeId) + throws Exception { + if (Hadoop3OmTransportFactory.class.equals(omTransportClass)) { + HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = + OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore); + followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId); + return; + } + + if (GrpcOmTransportFactory.class.equals(omTransportClass)) { + GrpcOmTransport grpcOmTransport = OmTestUtil.getGrpcOmTransport(objectStore); + grpcOmTransport.changeLeaderProxyForTest(leaderOMNodeId); + grpcOmTransport.changeFollowerReadInitialProxy(followerOMNodeId); + return; + } + + throw new IllegalArgumentException("Unsupported OM transport class " + omTransportClass); } /** @@ -571,4 +613,5 @@ void testClientWithLocalLeaseEnabled() throws Exception { IOUtils.closeQuietly(ozoneClient); } } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java index 878bfad603ba..e26e5328d7a3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java @@ -312,6 +312,7 @@ void testIncrementalWaitTimeWithSameNodeFailover() throws Exception { } @Test + @Order(Integer.MAX_VALUE) void testOMRetryProxy() { int maxFailoverAttempts = getOzoneClientFailoverMaxAttempts(); // Stop all the OMs. diff --git a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index 99b1272f82cb..db352dbcd62c 100644 --- a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -376,12 +376,15 @@ private static void configureOMPorts(ConfigurationTarget conf, OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId); String omHttpsAddrKey = ConfUtils.addKeySuffixes( OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId); + String omGrpcPortKey = ConfUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, omServiceId, omNodeId); String omRatisPortKey = ConfUtils.addKeySuffixes( OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId); conf.set(omAddrKey, localhostWithFreePort()); conf.set(omHttpAddrKey, localhostWithFreePort()); conf.set(omHttpsAddrKey, localhostWithFreePort()); + conf.setInt(omGrpcPortKey, getFreePort()); conf.setInt(omRatisPortKey, getFreePort()); }