diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index e146e221049a..33111fd9e885 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -217,7 +217,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Boolean reqGoogleDriveScope; private final Properties clientInfo = new Properties(); private boolean isReadOnlyTokenUsed = false; - private final ExecutorService metadataExecutor; + private ExecutorService metadataExecutor; private final ExecutorService queryExecutor; BigQueryConnection(String url) throws IOException { @@ -362,11 +362,6 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); - // Fixed thread pool queues tasks to limit concurrent metadata calls and prevent API - // throttling. - this.metadataExecutor = - BigQueryJdbcMdc.newFixedThreadPool( - String.format("BQ-Metadata-%s", connectionId), metadataFetchThreadCount); // Cached pool executes queries immediately without queueing and reclaims all idle threads // when inactive, minimizing resources. this.queryExecutor = @@ -954,7 +949,7 @@ public void setHoldability(int holdability) throws SQLException { * @see Connection#close() */ @Override - public void close() throws SQLException { + public synchronized void close() throws SQLException { if (isClosed()) { return; } @@ -1067,7 +1062,14 @@ ExecutorService getExecutorService() { return this.queryExecutor; } - ExecutorService getMetadataExecutor() { + synchronized ExecutorService getMetadataExecutor() { + if (this.metadataExecutor != null) { + return this.metadataExecutor; + } + checkClosed(); + this.metadataExecutor = + BigQueryJdbcMdc.newFixedThreadPool( + String.format("BQ-Metadata-%s", connectionId), metadataFetchThreadCount); return this.metadataExecutor; } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 836b3cb3ea6c..a4f91ca0381d 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -68,11 +68,9 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -802,13 +800,11 @@ public ResultSet getProcedures( final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final List> processingTaskFutures = new ArrayList<>(); final String catalogParam = catalog; Runnable procedureFetcher = () -> { ExecutorService apiExecutor = null; - ExecutorService routineProcessorExecutor = null; final FieldList localResultSchemaFields = resultSchemaFields; final List>> apiFutures = new ArrayList<>(); @@ -830,8 +826,7 @@ public ResultSet getProcedures( return; } - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - routineProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + apiExecutor = connection.getMetadataExecutor(); LOG.fine("Submitting parallel findMatchingRoutines tasks..."); for (Dataset dataset : datasetsToScan) { @@ -862,7 +857,6 @@ public ResultSet getProcedures( apiFutures.add(apiFuture); } LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingRoutines tasks."); - apiExecutor.shutdown(); LOG.fine("Processing results from findMatchingRoutines tasks..."); for (Future> apiFuture : apiFutures) { @@ -877,15 +871,8 @@ public ResultSet getProcedures( if (Thread.currentThread().isInterrupted()) break; if ("PROCEDURE".equalsIgnoreCase(routine.getRoutineType())) { - LOG.fine( - "Submitting processing task for procedure: " + routine.getRoutineId()); - final Routine finalRoutine = routine; - Future processFuture = - routineProcessorExecutor.submit( - () -> - processProcedureInfo( - finalRoutine, collectedResults, localResultSchemaFields)); - processingTaskFutures.add(processFuture); + LOG.fine("Processing procedure sequentially: " + routine.getRoutineId()); + processProcedureInfo(routine, collectedResults, localResultSchemaFields); } else { LOG.finer("Skipping non-procedure routine: " + routine.getRoutineId()); } @@ -906,21 +893,6 @@ public ResultSet getProcedures( } } - LOG.fine( - "Finished submitting " - + processingTaskFutures.size() - + " processProcedureInfo tasks."); - - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); - processingTaskFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processProcedureInfo tasks to complete..."); - waitForTasksCompletion(processingTaskFutures); - LOG.fine("All processProcedureInfo tasks completed or handled."); - } - if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetProceduresComparator(localResultSchemaFields); @@ -933,22 +905,18 @@ public ResultSet getProcedures( } catch (Throwable t) { LOG.severe("Unexpected error in procedure fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingTaskFutures.forEach(f -> f.cancel(true)); } finally { + apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(routineProcessorExecutor); LOG.info("Procedure fetcher thread finished."); } }; - Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog); + Future fetcherFuture = connection.getExecutorService().submit(procedureFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); - LOG.info("Started background thread for getProcedures"); + LOG.info("Submitted background task for getProcedures to metadata executor"); return resultSet; } @@ -1087,17 +1055,12 @@ public ResultSet getProcedureColumns( final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final List> processingTaskFutures = new ArrayList<>(); final String catalogParam = catalog; Runnable procedureColumnFetcher = () -> { ExecutorService listRoutinesExecutor = null; ExecutorService getRoutineDetailsExecutor = null; - ExecutorService processArgsExecutor = null; - - final String fetcherThreadNameSuffix = - "-" + catalogParam.substring(0, Math.min(10, catalogParam.length())); try { List datasetsToScan = @@ -1108,10 +1071,7 @@ public ResultSet getProcedureColumns( return; } - listRoutinesExecutor = - Executors.newFixedThreadPool( - API_EXECUTOR_POOL_SIZE, - runnable -> new Thread(runnable, "pcol-list-rout" + fetcherThreadNameSuffix)); + listRoutinesExecutor = connection.getMetadataExecutor(); List procedureIdsToGet = listMatchingProcedureIdsFromDatasets( datasetsToScan, @@ -1120,7 +1080,6 @@ public ResultSet getProcedureColumns( listRoutinesExecutor, catalogParam, LOG); - shutdownExecutor(listRoutinesExecutor); listRoutinesExecutor = null; if (procedureIdsToGet.isEmpty() || Thread.currentThread().isInterrupted()) { @@ -1128,13 +1087,9 @@ public ResultSet getProcedureColumns( return; } - getRoutineDetailsExecutor = - Executors.newFixedThreadPool( - 100, - runnable -> new Thread(runnable, "pcol-get-details" + fetcherThreadNameSuffix)); + getRoutineDetailsExecutor = connection.getMetadataExecutor(); List fullRoutines = fetchFullRoutineDetailsForIds(procedureIdsToGet, getRoutineDetailsExecutor, LOG); - shutdownExecutor(getRoutineDetailsExecutor); getRoutineDetailsExecutor = null; if (fullRoutines.isEmpty() || Thread.currentThread().isInterrupted()) { @@ -1143,35 +1098,8 @@ public ResultSet getProcedureColumns( return; } - processArgsExecutor = - Executors.newFixedThreadPool( - this.metadataFetchThreadCount, - runnable -> new Thread(runnable, "pcol-proc-args" + fetcherThreadNameSuffix)); - submitProcedureArgumentProcessingJobs( - fullRoutines, - columnNameRegex, - collectedResults, - resultSchema.getFields(), - processArgsExecutor, - processingTaskFutures, - LOG); - - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher: Interrupted before waiting for argument processing. Catalog: " - + catalogParam); - processingTaskFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine( - "Fetcher: Waiting for " - + processingTaskFutures.size() - + " argument processing tasks. Catalog: " - + catalogParam); - waitForTasksCompletion(processingTaskFutures); - LOG.fine( - "Fetcher: All argument processing tasks completed or handled. Catalog: " - + catalogParam); - } + processProcedureArgumentsSequentially( + fullRoutines, columnNameRegex, collectedResults, resultSchema.getFields(), LOG); if (!Thread.currentThread().isInterrupted()) { Comparator comparator = @@ -1187,30 +1115,23 @@ public ResultSet getProcedureColumns( + catalogParam + ". Error: " + e.getMessage()); - processingTaskFutures.forEach(f -> f.cancel(true)); } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); - processingTaskFutures.forEach(f -> f.cancel(true)); } finally { signalEndOfData(queue, resultSchema.getFields()); - if (listRoutinesExecutor != null) shutdownExecutor(listRoutinesExecutor); - if (getRoutineDetailsExecutor != null) shutdownExecutor(getRoutineDetailsExecutor); - if (processArgsExecutor != null) shutdownExecutor(processArgsExecutor); LOG.info("Procedure column fetcher thread finished for catalog: " + catalogParam); } }; - Thread fetcherThread = - new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog); + Future fetcherFuture = connection.getExecutorService().submit(procedureColumnFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); - LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog); + LOG.info("Started background task for getProcedureColumns for catalog: " + catalog); return resultSet; } @@ -1378,33 +1299,26 @@ List fetchFullRoutineDetailsForIds( return fullRoutines; } - void submitProcedureArgumentProcessingJobs( + void processProcedureArgumentsSequentially( List fullRoutines, Pattern columnNameRegex, List collectedResults, FieldList resultSchemaFields, - ExecutorService processArgsExecutor, - List> outArgumentProcessingFutures, BigQueryJdbcCustomLogger logger) throws InterruptedException { - logger.fine("Submitting argument processing jobs for %d routines.", fullRoutines.size()); + logger.fine("Processing argument jobs sequentially for %d routines.", fullRoutines.size()); for (Routine fullRoutine : fullRoutines) { if (Thread.currentThread().isInterrupted()) { InterruptedException ex = - new InterruptedException("Interrupted while submitting argument processing jobs"); + new InterruptedException("Interrupted while processing argument jobs sequentially"); logger.severe(ex.getMessage(), ex); throw ex; } if (fullRoutine != null) { if ("PROCEDURE".equalsIgnoreCase(fullRoutine.getRoutineType())) { - final Routine finalFullRoutine = fullRoutine; - Future processFuture = - processArgsExecutor.submit( - () -> - processProcedureArguments( - finalFullRoutine, columnNameRegex, collectedResults, resultSchemaFields)); - outArgumentProcessingFutures.add(processFuture); + processProcedureArguments( + fullRoutine, columnNameRegex, collectedResults, resultSchemaFields); } else { logger.warning( "Routine " @@ -1417,10 +1331,6 @@ void submitProcedureArgumentProcessingJobs( } } } - logger.fine( - "Finished submitting " - + outArgumentProcessingFutures.size() - + " processProcedureArguments tasks."); } Schema defineGetProcedureColumnsSchema() { @@ -1745,10 +1655,8 @@ public ResultSet getTables( Runnable tableFetcher = () -> { ExecutorService apiExecutor = null; - ExecutorService tableProcessorExecutor = null; final FieldList localResultSchemaFields = resultSchemaFields; final List>> apiFutures = new ArrayList<>(); - final List> processingFutures = new ArrayList<>(); try { List datasetsToScan = @@ -1768,8 +1676,7 @@ public ResultSet getTables( return; } - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + apiExecutor = connection.getMetadataExecutor(); LOG.fine("Submitting parallel findMatchingTables tasks..."); for (Dataset dataset : datasetsToScan) { @@ -1800,7 +1707,6 @@ public ResultSet getTables( apiFutures.add(apiFuture); } LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); - apiExecutor.shutdown(); LOG.fine("Processing results from findMatchingTables tasks..."); for (Future> apiFuture : apiFutures) { @@ -1814,16 +1720,9 @@ public ResultSet getTables( for (Table table : tablesResult) { if (Thread.currentThread().isInterrupted()) break; - final Table currentTable = table; - Future processFuture = - tableProcessorExecutor.submit( - () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields)); - processingFutures.add(processFuture); + LOG.fine("Processing table sequentially: " + table.getTableId()); + processTableInfo( + table, requestedTypes, collectedResults, localResultSchemaFields); } } } catch (InterruptedException e) { @@ -1841,19 +1740,6 @@ public ResultSet getTables( } } - LOG.fine( - "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); - - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); - processingFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); - waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); - } - if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetTablesComparator(localResultSchemaFields); @@ -1866,21 +1752,17 @@ public ResultSet getTables( } catch (Throwable t) { LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingFutures.forEach(f -> f.cancel(true)); } finally { + apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(tableProcessorExecutor); LOG.info("Table fetcher thread finished."); } }; - Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog); + Future fetcherFuture = connection.getExecutorService().submit(tableFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); LOG.info("Started background thread for getTables"); return resultSet; } @@ -2132,7 +2014,7 @@ public ResultSet getColumns( return; } - columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + columnExecutor = connection.getMetadataExecutor(); for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { @@ -2195,19 +2077,17 @@ public ResultSet getColumns( } catch (Throwable t) { LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - taskFutures.forEach(f -> f.cancel(true)); } finally { + taskFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(columnExecutor); LOG.info("Column fetcher thread finished."); } }; - Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog); + Future fetcherFuture = connection.getExecutorService().submit(columnFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); LOG.info("Started background thread for getColumns"); return resultSet; } @@ -3720,12 +3600,11 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce } }; - Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog); + Future fetcherFuture = connection.getExecutorService().submit(schemaFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); - LOG.info("Started background thread for getSchemas"); + LOG.info("Submitted background task for getSchemas to metadata executor"); return resultSet; } @@ -3887,13 +3766,11 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final List> processingTaskFutures = new ArrayList<>(); final String catalogParam = catalog; Runnable functionFetcher = () -> { ExecutorService apiExecutor = null; - ExecutorService routineProcessorExecutor = null; final FieldList localResultSchemaFields = resultSchemaFields; final List>> apiFutures = new ArrayList<>(); @@ -3915,8 +3792,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct return; } - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - routineProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + apiExecutor = connection.getMetadataExecutor(); for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { @@ -3954,7 +3830,6 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct "Finished submitting " + apiFutures.size() + " findMatchingRoutines (for functions) tasks."); - apiExecutor.shutdown(); for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { @@ -3972,17 +3847,11 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct if ("SCALAR_FUNCTION".equalsIgnoreCase(routineType) || "TABLE_FUNCTION".equalsIgnoreCase(routineType)) { LOG.fine( - "Submitting processing task for function: " + "Processing function sequentially: " + routine.getRoutineId() + " of type " + routineType); - final Routine finalRoutine = routine; - Future processFuture = - routineProcessorExecutor.submit( - () -> - processFunctionInfo( - finalRoutine, collectedResults, localResultSchemaFields)); - processingTaskFutures.add(processFuture); + processFunctionInfo(routine, collectedResults, localResultSchemaFields); } } } @@ -3997,28 +3866,23 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct + e.getMessage()); } } - waitForTasksCompletion(processingTaskFutures); Comparator comparator = defineGetFunctionsComparator(localResultSchemaFields); sortResults(collectedResults, comparator, "getFunctions", LOG); populateQueue(collectedResults, queue, localResultSchemaFields); } catch (Throwable t) { LOG.severe("Unexpected error in function fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingTaskFutures.forEach(f -> f.cancel(true)); } finally { + apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(routineProcessorExecutor); LOG.info("Function fetcher thread finished."); } }; - Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog); + Future fetcherFuture = connection.getExecutorService().submit(functionFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); LOG.info("Started background thread for getFunctions"); return resultSet; } @@ -4139,16 +4003,12 @@ public ResultSet getFunctionColumns( final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final List> processingTaskFutures = new ArrayList<>(); final String catalogParam = catalog; Runnable functionColumnFetcher = () -> { ExecutorService listRoutinesExecutor = null; ExecutorService getRoutineDetailsExecutor = null; - ExecutorService processParamsExecutor = null; - final String fetcherThreadNameSuffix = - "-" + catalogParam.substring(0, Math.min(10, catalogParam.length())); try { List datasetsToScan = @@ -4169,10 +4029,7 @@ public ResultSet getFunctionColumns( return; } - listRoutinesExecutor = - Executors.newFixedThreadPool( - API_EXECUTOR_POOL_SIZE, - runnable -> new Thread(runnable, "funcol-list-rout" + fetcherThreadNameSuffix)); + listRoutinesExecutor = connection.getMetadataExecutor(); List functionIdsToGet = listMatchingFunctionIdsFromDatasets( datasetsToScan, @@ -4181,7 +4038,6 @@ public ResultSet getFunctionColumns( listRoutinesExecutor, catalogParam, LOG); - shutdownExecutor(listRoutinesExecutor); listRoutinesExecutor = null; if (functionIdsToGet.isEmpty() || Thread.currentThread().isInterrupted()) { @@ -4189,14 +4045,9 @@ public ResultSet getFunctionColumns( return; } - getRoutineDetailsExecutor = - Executors.newFixedThreadPool( - this.metadataFetchThreadCount, - runnable -> - new Thread(runnable, "funcol-get-details" + fetcherThreadNameSuffix)); + getRoutineDetailsExecutor = connection.getMetadataExecutor(); List fullFunctions = fetchFullRoutineDetailsForIds(functionIdsToGet, getRoutineDetailsExecutor, LOG); - shutdownExecutor(getRoutineDetailsExecutor); getRoutineDetailsExecutor = null; if (fullFunctions.isEmpty() || Thread.currentThread().isInterrupted()) { @@ -4205,36 +4056,8 @@ public ResultSet getFunctionColumns( return; } - processParamsExecutor = - Executors.newFixedThreadPool( - this.metadataFetchThreadCount, - runnable -> - new Thread(runnable, "funcol-proc-params" + fetcherThreadNameSuffix)); - submitFunctionParameterProcessingJobs( - fullFunctions, - columnNameRegex, - collectedResults, - resultSchemaFields, - processParamsExecutor, - processingTaskFutures, - LOG); - - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher: Interrupted before waiting for parameter processing. Catalog: " - + catalogParam); - processingTaskFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine( - "Fetcher: Waiting for " - + processingTaskFutures.size() - + " parameter processing tasks. Catalog: " - + catalogParam); - waitForTasksCompletion(processingTaskFutures); - LOG.fine( - "Fetcher: All parameter processing tasks completed or handled. Catalog: " - + catalogParam); - } + processFunctionParametersSequentially( + fullFunctions, columnNameRegex, collectedResults, resultSchemaFields, LOG); if (!Thread.currentThread().isInterrupted()) { Comparator comparator = @@ -4250,29 +4073,22 @@ public ResultSet getFunctionColumns( + catalogParam + ". Error: " + e.getMessage()); - processingTaskFutures.forEach(f -> f.cancel(true)); } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); - processingTaskFutures.forEach(f -> f.cancel(true)); } finally { signalEndOfData(queue, resultSchemaFields); - if (listRoutinesExecutor != null) shutdownExecutor(listRoutinesExecutor); - if (getRoutineDetailsExecutor != null) shutdownExecutor(getRoutineDetailsExecutor); - if (processParamsExecutor != null) shutdownExecutor(processParamsExecutor); LOG.info("Function column fetcher thread finished for catalog: " + catalogParam); } }; - Thread fetcherThread = - new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog); + Future fetcherFuture = connection.getExecutorService().submit(functionColumnFetcher); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Future[] {fetcherFuture}); - fetcherThread.start(); LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog); return resultSet; } @@ -4432,37 +4248,27 @@ List listMatchingFunctionIdsFromDatasets( return functionIdsToGet; } - void submitFunctionParameterProcessingJobs( + void processFunctionParametersSequentially( List fullFunctions, Pattern columnNameRegex, List collectedResults, FieldList resultSchemaFields, - ExecutorService processParamsExecutor, - List> outParameterProcessingFutures, BigQueryJdbcCustomLogger logger) throws InterruptedException { - logger.fine("Submitting parameter processing jobs for %d functions.", fullFunctions.size()); + logger.fine("Processing parameter jobs sequentially for %d functions.", fullFunctions.size()); for (Routine fullFunction : fullFunctions) { if (Thread.currentThread().isInterrupted()) { - logger.warning("Interrupted during submission of function parameter processing tasks."); + logger.warning("Interrupted during function parameter processing."); throw new InterruptedException( - "Interrupted while submitting function parameter processing jobs"); + "Interrupted while processing function parameters sequentially"); } if (fullFunction != null) { String routineType = fullFunction.getRoutineType(); if ("SCALAR_FUNCTION".equalsIgnoreCase(routineType) || "TABLE_FUNCTION".equalsIgnoreCase(routineType)) { - final Routine finalFullFunction = fullFunction; - Future processFuture = - processParamsExecutor.submit( - () -> - processFunctionParametersAndReturnValue( - finalFullFunction, - columnNameRegex, - collectedResults, - resultSchemaFields)); - outParameterProcessingFutures.add(processFuture); + processFunctionParametersAndReturnValue( + fullFunction, columnNameRegex, collectedResults, resultSchemaFields); } else { logger.warning( "Routine " @@ -4475,10 +4281,6 @@ void submitFunctionParameterProcessingJobs( } } } - logger.fine( - "Finished submitting " - + outParameterProcessingFutures.size() - + " processFunctionParametersAndReturnValue tasks."); } void processFunctionParametersAndReturnValue( @@ -5277,80 +5079,4 @@ private void loadDriverVersionProperties() { throw ex; } } - - // TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures. - // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService - // directly. - static Future[] wrapThread(final Thread thread) { - if (thread == null) { - return null; - } - return new Future[] { - new Future() { - private volatile boolean cancelled = false; - - @Override - public synchronized boolean cancel(boolean mayInterruptIfRunning) { - if (cancelled || thread.getState() == Thread.State.TERMINATED) { - return false; - } - cancelled = true; - if (mayInterruptIfRunning) { - thread.interrupt(); - } - return true; - } - - @Override - public boolean isCancelled() { - return cancelled; - } - - @Override - public boolean isDone() { - return cancelled || thread.getState() == Thread.State.TERMINATED; - } - - @Override - public Object get() throws InterruptedException, CancellationException { - try { - return get(365, TimeUnit.DAYS); - } catch (TimeoutException e) { - throw new RuntimeException(e); - } - } - - @Override - public Object get(long timeout, TimeUnit unit) - throws InterruptedException, CancellationException, TimeoutException { - if (isCancelled()) { - throw new CancellationException(); - } - long remainingNanos = unit.toNanos(timeout); - long deadline = System.nanoTime() + remainingNanos; - while (thread.getState() != Thread.State.TERMINATED) { - if (isCancelled()) { - throw new CancellationException(); - } - if (remainingNanos <= 0) { - throw new TimeoutException(); - } - long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); - if (remainingMillis == 0) { - remainingMillis = 1; - } - - long delay = Math.min(remainingMillis, 50); - if (thread.getState() == Thread.State.NEW) { - Thread.sleep(delay); - } else { - thread.join(delay); - } - remainingNanos = deadline - System.nanoTime(); - } - return null; - } - } - }; - } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index f756c636d847..3a97f41924a8 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -44,14 +44,12 @@ import java.sql.Types; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -62,20 +60,31 @@ public class BigQueryDatabaseMetaDataTest { private BigQueryConnection bigQueryConnection; private BigQueryDatabaseMetaData dbMetadata; private BigQuery bigqueryClient; + private ExecutorService metadataExecutor; @BeforeEach public void setUp() throws SQLException { bigQueryConnection = mock(BigQueryConnection.class); bigqueryClient = mock(BigQuery.class); Statement mockStatement = mock(Statement.class); + metadataExecutor = Executors.newCachedThreadPool(); when(bigQueryConnection.getConnectionUrl()).thenReturn("jdbc:bigquery://test-project"); when(bigQueryConnection.getBigQuery()).thenReturn(bigqueryClient); when(bigQueryConnection.createStatement()).thenReturn(mockStatement); + when(bigQueryConnection.getMetadataExecutor()).thenReturn(metadataExecutor); + when(bigQueryConnection.getExecutorService()).thenReturn(metadataExecutor); dbMetadata = new BigQueryDatabaseMetaData(bigQueryConnection); } + @AfterEach + public void tearDown() { + if (metadataExecutor != null) { + metadataExecutor.shutdownNow(); + } + } + private Table mockBigQueryTable( String project, String dataset, String table, TableDefinition.Type type, String description) { Table mockTable = mock(Table.class); @@ -1598,7 +1607,7 @@ public void testListMatchingProcedureIdsFromDatasets() throws Exception { } @Test - public void testSubmitProcedureArgumentProcessingJobs_Basic() throws InterruptedException { + public void testProcessProcedureArgumentsSequentially_Basic() throws InterruptedException { String catalog = "p"; String schemaName = "d"; RoutineArgument arg1 = mockRoutineArgument("arg1_name", StandardSQLTypeName.STRING, "IN"); @@ -1623,32 +1632,13 @@ public void testSubmitProcedureArgumentProcessingJobs_Basic() throws Interrupted Schema resultSchema = dbMetadata.defineGetProcedureColumnsSchema(); FieldList resultSchemaFields = resultSchema.getFields(); - ExecutorService mockExecutor = mock(ExecutorService.class); - List> processingTaskFutures = new ArrayList<>(); + dbMetadata.processProcedureArgumentsSequentially( + fullRoutines, columnNameRegex, collectedResults, resultSchemaFields, dbMetadata.LOG); - // Capture the runnable submitted to the executor - List submittedRunnables = new ArrayList<>(); - doAnswer( - invocation -> { - Runnable runnable = invocation.getArgument(0); - submittedRunnables.add(runnable); - Future future = mock(Future.class); - return future; - }) - .when(mockExecutor) - .submit(any(Runnable.class)); - - dbMetadata.submitProcedureArgumentProcessingJobs( - fullRoutines, - columnNameRegex, - collectedResults, - resultSchemaFields, - mockExecutor, - processingTaskFutures, - dbMetadata.LOG); - - verify(mockExecutor, times(2)).submit(any(Runnable.class)); - assertEquals(2, processingTaskFutures.size()); + // Only proc1 has arguments, so collectedResults should contain 1 row. + assertEquals(1, collectedResults.size()); + FieldValueList row = collectedResults.get(0); + assertEquals("arg1_name", row.get("COLUMN_NAME").getStringValue()); } @Test @@ -3436,125 +3426,4 @@ public void testGetSchemas_WithoutProjectDiscovery() throws SQLException { verify(bigqueryClient, never()) .listDatasets(eq("discovered-1"), any(BigQuery.DatasetListOption.class)); } - - @Test - public void testWrapThread_NullThread() { - assertNull(BigQueryDatabaseMetaData.wrapThread(null)); - } - - @Test - public void testWrapThread_BasicLifecycle() throws Exception { - CountDownLatch startLatch = new CountDownLatch(1); - CountDownLatch finishLatch = new CountDownLatch(1); - Thread t = - new Thread( - () -> { - try { - startLatch.countDown(); - finishLatch.await(); - } catch (InterruptedException e) { - // ignore - } - }); - - Future[] futures = BigQueryDatabaseMetaData.wrapThread(t); - assertNotNull(futures); - assertEquals(1, futures.length); - Future f = futures[0]; - - // Thread is NEW (not started yet). - assertFalse(f.isDone()); - assertFalse(f.isCancelled()); - - t.start(); - startLatch.await(); - - // Thread is running. - assertFalse(f.isDone()); - assertFalse(f.isCancelled()); - - finishLatch.countDown(); - t.join(); - - // Thread is terminated. - assertTrue(f.isDone()); - assertFalse(f.isCancelled()); - assertNull(f.get()); - } - - @Test - public void testWrapThread_CancelBeforeStart() throws Exception { - Thread t = - new Thread( - () -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // ignore - } - }); - - Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; - assertTrue(f.cancel(true)); - assertTrue(f.isCancelled()); - assertTrue(f.isDone()); - - // cancel on already cancelled should return false - assertFalse(f.cancel(true)); - - assertThrows(CancellationException.class, () -> f.get()); - assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS)); - } - - @Test - public void testWrapThread_CancelRunningWithInterrupt() throws Exception { - CountDownLatch startLatch = new CountDownLatch(1); - CountDownLatch interruptedLatch = new CountDownLatch(1); - Thread t = - new Thread( - () -> { - startLatch.countDown(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - interruptedLatch.countDown(); - } - }); - - t.start(); - startLatch.await(); - - Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; - assertTrue(f.cancel(true)); - assertTrue(f.isCancelled()); - assertTrue(f.isDone()); - - assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS)); - assertThrows(CancellationException.class, () -> f.get()); - } - - @Test - public void testWrapThread_GetTimeout() throws Exception { - CountDownLatch startLatch = new CountDownLatch(1); - Thread t = - new Thread( - () -> { - startLatch.countDown(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // ignore - } - }); - - t.start(); - startLatch.await(); - - Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; - assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS)); - - // Cleanup: stop the thread - t.interrupt(); - t.join(); - } }