From eb1d6da192023a1237c5a23d9d6927426ddc97de Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Mon, 10 Jul 2023 22:58:19 +0530 Subject: [PATCH] Limit the number of idle connections and catch errors in aux calls --- .../io/cdap/cdap/common/conf/Constants.java | 6 ++ .../src/main/resources/cdap-default.xml | 26 +++++++ .../data/sql/PostgreSqlStorageProvider.java | 6 ++ .../spi/data/sql/SqlTransactionRunner.java | 11 ++- .../data/sql/SqlTransactionRunnerTest.java | 71 +++++++++++++++++++ pom.xml | 2 +- 6 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunnerTest.java diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 222f4b64fffc..7df591afb0c9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -723,6 +723,12 @@ public static final class Dataset { public static final String DATA_STORAGE_SQL_JDBC_CONNECTION_URL = "data.storage.sql.jdbc.connection.url"; public static final String DATA_STORAGE_SQL_PROPERTY_PREFIX = "data.storage.sql.jdbc.property."; public static final String DATA_STORAGE_SQL_CONNECTION_SIZE = "data.storage.sql.jdbc.connection.pool.size"; + public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_SIZE = + "data.storage.sql.jdbc.connection.pool.idle.size"; + public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_MILLIS = + "data.storage.sql.jdbc.connection.pool.idle.millis"; + public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_EVICTION_MILLIS = + "data.storage.sql.jdbc.connection.pool.idle.eviction.millis"; public static final String DATA_STORAGE_SQL_SCAN_FETCH_SIZE_ROWS = "data.storage.sql.scan.size.rows"; public static final String DATA_STORAGE_SQL_TRANSACTION_RUNNER_MAX_RETRIES = "data.storage.sql.tx.runner.max.retries"; diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index ad52ead56ba3..4b29f3a4a87d 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -1217,6 +1217,32 @@ + + data.storage.sql.jdbc.connection.pool.idle.size + 5 + + The max number of idle connections for the sql connection pool. + + + + + data.storage.sql.jdbc.connection.pool.idle.millis + 300000 + + The minimum amount of time a connection may sit idle in the pool + before it is eligible for eviction. + + + + + data.storage.sql.jdbc.connection.pool.idle.eviction.millis + 180000 + + The minimum amount of time to sleep between runs of the + idle connection eviction thread.. + + + data.storage.sql.tx.runner.max.retries 20 diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStorageProvider.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStorageProvider.java index f35eb5d17412..a507e0fc2c98 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStorageProvider.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStorageProvider.java @@ -135,6 +135,12 @@ public static DataSource createDataSource(CConfiguration cConf, SConfiguration s poolableConnectionFactory); poolableConnectionFactory.setPool(connectionPool); connectionPool.setMaxTotal(cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_SIZE)); + connectionPool.setMaxIdle( + cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_SIZE)); + connectionPool.setMinEvictableIdleTimeMillis( + cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_MILLIS)); + connectionPool.setTimeBetweenEvictionRunsMillis( + cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_EVICTION_MILLIS)); PoolingDataSource dataSource = new PoolingDataSource<>(connectionPool); return new MetricsDataSource(dataSource, metricsCollectionService, connectionPool); } diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunner.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunner.java index 944dd6e16259..05ee0e39086d 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunner.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunner.java @@ -16,6 +16,7 @@ package io.cdap.cdap.spi.data.sql; +import com.google.common.base.Throwables; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.common.conf.Constants; @@ -25,6 +26,7 @@ import io.cdap.cdap.spi.data.transaction.TxRunnable; import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +75,12 @@ public void run(TxRunnable runnable) throws TransactionException { this.scanFetchSize)); connection.commit(); } catch (Exception e) { - Throwable cause = e.getCause(); - if (cause instanceof SQLException) { - rollback(connection, new SqlTransactionException((SQLException) cause, e)); + List causes = Throwables.getCausalChain(e); + for (Throwable cause : causes) { + if (cause instanceof SQLException) { + rollback(connection, new SqlTransactionException((SQLException) cause, e)); + break; + } } rollback(connection, new TransactionException("Failed to execute the sql queries.", e)); } finally { diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunnerTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunnerTest.java new file mode 100644 index 000000000000..4fee08a8fe2c --- /dev/null +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/sql/SqlTransactionRunnerTest.java @@ -0,0 +1,71 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.spi.data.sql; + +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.transaction.TransactionException; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import javax.sql.DataSource; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Unit Tests for SQL Transaction Runner. + */ +public class SqlTransactionRunnerTest { + + @Test(expected = SqlTransactionException.class) + public void testSqlExceptionPropagation() throws SQLException, TransactionException { + StructuredTableAdmin mockTableAdmin = Mockito.mock(StructuredTableAdmin.class); + DataSource mockDataSource = Mockito.mock(DataSource.class); + Connection mockConnection = Mockito.mock(Connection.class); + + SqlTransactionRunner sqlTransactionRunner = new SqlTransactionRunner(mockTableAdmin, + mockDataSource, new NoOpMetricsCollectionService(), false, 0); + + Mockito.when(mockDataSource.getConnection()).thenReturn(mockConnection); + // throw an exception from setTransactionIsolation method + Mockito.doThrow(new RuntimeException("RuntimeException", new IOException("IOException", + new SQLException("SQLConnectionException", "08*")))) + .when(mockConnection).setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + + // run the test + sqlTransactionRunner.run(context -> {}); + } + + @Test(expected = SqlTransactionException.class) + public void testSqlException() throws SQLException, TransactionException { + StructuredTableAdmin mockTableAdmin = Mockito.mock(StructuredTableAdmin.class); + DataSource mockDataSource = Mockito.mock(DataSource.class); + Connection mockConnection = Mockito.mock(Connection.class); + + SqlTransactionRunner sqlTransactionRunner = new SqlTransactionRunner(mockTableAdmin, + mockDataSource, new NoOpMetricsCollectionService(), false, 0); + + Mockito.when(mockDataSource.getConnection()).thenReturn(mockConnection); + // throw an exception from setTransactionIsolation method + Mockito.doThrow(new SQLException("SQLConnectionException", "08*")) + .when(mockConnection).setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + + // run the test + sqlTransactionRunner.run(context -> {}); + } + +} diff --git a/pom.xml b/pom.xml index 8081c16094db..1e7583a725b6 100644 --- a/pom.xml +++ b/pom.xml @@ -177,7 +177,7 @@ 2.3.6 3.4.5 1.3.1 - 2.6.0 + 2.9.0 0.8.6 2.5.1