Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ protected synchronized boolean shouldFailover(Exception ex) {
return super.shouldFailover(ex);
}

public synchronized boolean shouldFailoverForFollowerRead(Exception ex) {
return shouldFailover(ex);
}

@Override
public synchronized void close() throws IOException { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,23 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc_.RemoteException;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
import org.apache.hadoop.ozone.om.helpers.ReadConsistency;
import org.apache.hadoop.ozone.om.protocolPB.grpc.ClientAddressClientInterceptor;
import org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,6 +99,10 @@ public class GrpcOmTransport implements OmTransport {
private RetryPolicy retryPolicy;
private final GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
private volatile boolean useFollowerRead;
private final ReadConsistencyHint followerReadConsistency;
private final ReadConsistencyHint leaderReadConsistency;
private int currentFollowerReadIndex = -1;

public static void setCaCerts(List<X509Certificate> x509Certificates) {
caCerts = x509Certificates;
Expand All @@ -117,6 +128,27 @@ public GrpcOmTransport(ConfigurationSource conf,
omServiceId,
OzoneManagerProtocolPB.class);

this.useFollowerRead = conf.getBoolean(
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT);
String defaultFollowerReadConsistencyStr = conf.get(
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
);
ReadConsistency defaultFollowerReadConsistency =
ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
String defaultLeaderReadConsistencyStr = conf.get(
OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY,
OzoneConfigKeys.OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT);
ReadConsistency defaultLeaderReadConsistency =
ReadConsistency.valueOf(defaultLeaderReadConsistencyStr);
Preconditions.assertTrue(defaultFollowerReadConsistency.allowFollowerRead(),
"Invalid follower read consistency " + defaultFollowerReadConsistency);
Preconditions.assertTrue(!defaultLeaderReadConsistency.allowFollowerRead(),
"Invalid leader read consistency " + defaultLeaderReadConsistency);
this.followerReadConsistency = defaultFollowerReadConsistency.getHint();
this.leaderReadConsistency = defaultLeaderReadConsistency.getHint();

start();
}

Expand Down Expand Up @@ -174,7 +206,63 @@ public void start() throws IOException {

@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
AtomicReference<OMResponse> resp = new AtomicReference<>();
if (useFollowerRead && OmUtils.shouldSendToFollower(payload)) {
return submitRequestWithFollowerRead(payload);
}
return submitRequestToLeader(addReadConsistencyHint(payload,
leaderReadConsistency));
}

private OMResponse submitRequestWithFollowerRead(OMRequest payload)
throws IOException {
OMRequest followerPayload = addReadConsistencyHint(payload,
followerReadConsistency);
int failedCount = 0;
for (int i = 0; useFollowerRead &&
i < omFailoverProxyProvider.getOMProxyMap().getNodeIds().size(); i++) {
String nodeId = getCurrentFollowerReadNodeId();
String followerHost = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
try {
OMResponse response = submitRequestToHost(followerPayload, followerHost);
LOG.debug("Invocation with cmdType {} using follower read host {} was successful",
followerPayload.getCmdType(), followerHost);
return response;
} catch (StatusRuntimeException e) {
LOG.debug("Invocation with cmdType {} using follower read host {} failed",
followerPayload.getCmdType(), followerHost, e);
Exception unwrapped = unwrapException(new Exception(e));
if (OMFailoverProxyProviderBase.getNotLeaderException(unwrapped) != null) {
LOG.debug("Encountered OMNotLeaderException from {}. Disable OM follower read and retry OM leader directly.",
followerHost);
useFollowerRead = false;
break;
}
if (OMFailoverProxyProviderBase.getLeaderNotReadyException(unwrapped) != null) {
break;
}
ReadIndexException readIndexException =
OMFailoverProxyProviderBase.getReadIndexException(unwrapped);
ReadException readException =
OMFailoverProxyProviderBase.getReadException(unwrapped);
if (readIndexException != null || readException != null ||
omFailoverProxyProvider.shouldFailoverForFollowerRead(unwrapped)) {
failedCount++;
changeFollowerReadProxy(nodeId);
} else {
throw e;
}
}
}
if (failedCount > 0) {
LOG.warn("{} nodes have failed for read request with cmdType {}. Falling back to leader.",
failedCount, payload.getCmdType());
}
return submitRequestToLeader(addReadConsistencyHint(payload,
leaderReadConsistency));
}

private OMResponse submitRequestToLeader(OMRequest payload)
throws IOException {
int requestFailoverCount = 0;
boolean tryOtherHost = true;
int expectedFailoverCount = 0;
Expand All @@ -183,14 +271,7 @@ public OMResponse submitRequest(OMRequest payload) throws IOException {
tryOtherHost = false;
expectedFailoverCount = globalFailoverCount.get();
try {
InetAddress inetAddress = InetAddress.getLocalHost();
Context.current()
.withValue(GrpcClientConstants.CLIENT_IP_ADDRESS_CTX_KEY,
inetAddress.getHostAddress())
.withValue(GrpcClientConstants.CLIENT_HOSTNAME_CTX_KEY,
inetAddress.getHostName())
.run(() -> resp.set(clients.get(host.get())
.submitRequest(payload)));
return submitRequestToHost(payload, host.get());
} catch (StatusRuntimeException e) {
LOG.error("Failed to submit request", e);
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
Expand All @@ -208,9 +289,49 @@ public OMResponse submitRequest(OMRequest payload) throws IOException {
}
}
}
throw new OMException(resultCode);
}

private OMResponse submitRequestToHost(OMRequest payload, String targetHost)
throws IOException {
AtomicReference<OMResponse> resp = new AtomicReference<>();
InetAddress inetAddress = InetAddress.getLocalHost();
Context.current()
.withValue(GrpcClientConstants.CLIENT_IP_ADDRESS_CTX_KEY,
inetAddress.getHostAddress())
.withValue(GrpcClientConstants.CLIENT_HOSTNAME_CTX_KEY,
inetAddress.getHostName())
.run(() -> resp.set(clients.get(targetHost)
.submitRequest(payload)));
return resp.get();
}

private OMRequest addReadConsistencyHint(OMRequest payload,
ReadConsistencyHint readConsistencyHint) {
if (!payload.hasReadConsistencyHint() && readConsistencyHint != null) {
return payload.toBuilder()
.setReadConsistencyHint(readConsistencyHint)
.build();
}
return payload;
}

private synchronized String getCurrentFollowerReadNodeId() {
if (currentFollowerReadIndex < 0) {
currentFollowerReadIndex = 0;
}
return new ArrayList<>(omFailoverProxyProvider.getOMProxyMap().getNodeIds())
.get(currentFollowerReadIndex);
}

private synchronized void changeFollowerReadProxy(String currentNodeId) {
String currentFollowerReadNodeId = getCurrentFollowerReadNodeId();
if (currentFollowerReadNodeId.equals(currentNodeId)) {
currentFollowerReadIndex = (currentFollowerReadIndex + 1) %
omFailoverProxyProvider.getOMProxyMap().getNodeIds().size();
}
}

private Exception unwrapException(Exception ex) {
Exception grpcException = null;
try {
Expand All @@ -230,11 +351,10 @@ private Exception unwrapException(Exception ex) {
grpcException = cn.newInstance(status.getDescription());
IOException remote = null;
try {
String cause = status.getDescription();
int colonIndex = cause.indexOf(':');
cause = cause.substring(colonIndex + 2);
remote = new RemoteException(cause.substring(0, colonIndex),
cause.substring(colonIndex + 1));
String description = status.getDescription();
int colonIndex = description.indexOf(':');
remote = new RemoteException(description.substring(0, colonIndex),
description.substring(colonIndex + 2));
grpcException.initCause(remote);
} catch (Exception e) {
LOG.error("cannot get cause for remote exception");
Expand Down Expand Up @@ -371,4 +491,32 @@ public void startClient(ManagedChannel testChannel) throws IOException {
LOG.info("{}: started", CLIENT_NAME);
}

@VisibleForTesting
public void startClient(String nodeId, ManagedChannel testChannel) throws IOException {
String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
clients.put(hostaddr,
OzoneManagerServiceGrpc
.newBlockingStub(testChannel));
LOG.info("{}: started test client for {}", CLIENT_NAME, nodeId);
}

@VisibleForTesting
public synchronized void changeFollowerReadInitialProxy(String nodeId) {
List<String> nodeIds = new ArrayList<>(
omFailoverProxyProvider.getOMProxyMap().getNodeIds());
for (int i = 0; i < nodeIds.size(); i++) {
if (nodeIds.get(i).equals(nodeId)) {
currentFollowerReadIndex = i;
return;
}
}
}

@VisibleForTesting
public void changeLeaderProxyForTest(String nodeId) throws IOException {
omFailoverProxyProvider.setNextOmProxy(nodeId);
omFailoverProxyProvider.performFailover(null);
host.set(omFailoverProxyProvider.getGrpcProxyAddress(nodeId));
}

}
Loading