Skip to content

Commit

Permalink
Merge pull request cdapio#15226 from cdapio/CDAP-20722
Browse files Browse the repository at this point in the history
[CDAP-20722] Limit the number of idle connections and catch errors in aux calls
  • Loading branch information
itsankit-google authored Jul 11, 2023
2 parents b85ed13 + eb1d6da commit 9491abe
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ public static final class Dataset {
public static final String DATA_STORAGE_SQL_JDBC_CONNECTION_URL = "data.storage.sql.jdbc.connection.url";
public static final String DATA_STORAGE_SQL_PROPERTY_PREFIX = "data.storage.sql.jdbc.property.";
public static final String DATA_STORAGE_SQL_CONNECTION_SIZE = "data.storage.sql.jdbc.connection.pool.size";
public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_SIZE =
"data.storage.sql.jdbc.connection.pool.idle.size";
public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_MILLIS =
"data.storage.sql.jdbc.connection.pool.idle.millis";
public static final String DATA_STORAGE_SQL_CONNECTION_IDLE_EVICTION_MILLIS =
"data.storage.sql.jdbc.connection.pool.idle.eviction.millis";
public static final String DATA_STORAGE_SQL_SCAN_FETCH_SIZE_ROWS = "data.storage.sql.scan.size.rows";
public static final String DATA_STORAGE_SQL_TRANSACTION_RUNNER_MAX_RETRIES =
"data.storage.sql.tx.runner.max.retries";
Expand Down
26 changes: 26 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,32 @@
</description>
</property>

<property>
<name>data.storage.sql.jdbc.connection.pool.idle.size</name>
<value>5</value>
<description>
The max number of idle connections for the sql connection pool.
</description>
</property>

<property>
<name>data.storage.sql.jdbc.connection.pool.idle.millis</name>
<value>300000</value>
<description>
The minimum amount of time a connection may sit idle in the pool
before it is eligible for eviction.
</description>
</property>

<property>
<name>data.storage.sql.jdbc.connection.pool.idle.eviction.millis</name>
<value>180000</value>
<description>
The minimum amount of time to sleep between runs of the
idle connection eviction thread..
</description>
</property>

<property>
<name>data.storage.sql.tx.runner.max.retries</name>
<value>20</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public static DataSource createDataSource(CConfiguration cConf, SConfiguration s
poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
connectionPool.setMaxTotal(cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_SIZE));
connectionPool.setMaxIdle(
cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_SIZE));
connectionPool.setMinEvictableIdleTimeMillis(
cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_MILLIS));
connectionPool.setTimeBetweenEvictionRunsMillis(
cConf.getInt(Constants.Dataset.DATA_STORAGE_SQL_CONNECTION_IDLE_EVICTION_MILLIS));
PoolingDataSource<PoolableConnection> dataSource = new PoolingDataSource<>(connectionPool);
return new MetricsDataSource(dataSource, metricsCollectionService, connectionPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.spi.data.sql;

import com.google.common.base.Throwables;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.api.metrics.MetricsContext;
import io.cdap.cdap.common.conf.Constants;
Expand All @@ -25,6 +26,7 @@
import io.cdap.cdap.spi.data.transaction.TxRunnable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,9 +75,12 @@ public void run(TxRunnable runnable) throws TransactionException {
this.scanFetchSize));
connection.commit();
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SQLException) {
rollback(connection, new SqlTransactionException((SQLException) cause, e));
List<Throwable> causes = Throwables.getCausalChain(e);
for (Throwable cause : causes) {
if (cause instanceof SQLException) {
rollback(connection, new SqlTransactionException((SQLException) cause, e));
break;
}
}
rollback(connection, new TransactionException("Failed to execute the sql queries.", e));
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.spi.data.sql;

import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.junit.Test;
import org.mockito.Mockito;

/**
* Unit Tests for SQL Transaction Runner.
*/
public class SqlTransactionRunnerTest {

@Test(expected = SqlTransactionException.class)
public void testSqlExceptionPropagation() throws SQLException, TransactionException {
StructuredTableAdmin mockTableAdmin = Mockito.mock(StructuredTableAdmin.class);
DataSource mockDataSource = Mockito.mock(DataSource.class);
Connection mockConnection = Mockito.mock(Connection.class);

SqlTransactionRunner sqlTransactionRunner = new SqlTransactionRunner(mockTableAdmin,
mockDataSource, new NoOpMetricsCollectionService(), false, 0);

Mockito.when(mockDataSource.getConnection()).thenReturn(mockConnection);
// throw an exception from setTransactionIsolation method
Mockito.doThrow(new RuntimeException("RuntimeException", new IOException("IOException",
new SQLException("SQLConnectionException", "08*"))))
.when(mockConnection).setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);

// run the test
sqlTransactionRunner.run(context -> {});
}

@Test(expected = SqlTransactionException.class)
public void testSqlException() throws SQLException, TransactionException {
StructuredTableAdmin mockTableAdmin = Mockito.mock(StructuredTableAdmin.class);
DataSource mockDataSource = Mockito.mock(DataSource.class);
Connection mockConnection = Mockito.mock(Connection.class);

SqlTransactionRunner sqlTransactionRunner = new SqlTransactionRunner(mockTableAdmin,
mockDataSource, new NoOpMetricsCollectionService(), false, 0);

Mockito.when(mockDataSource.getConnection()).thenReturn(mockConnection);
// throw an exception from setTransactionIsolation method
Mockito.doThrow(new SQLException("SQLConnectionException", "08*"))
.when(mockConnection).setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);

// run the test
sqlTransactionRunner.run(context -> {});
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
<unboundid.version>2.3.6</unboundid.version>
<zookeeper.version>3.4.5</zookeeper.version>
<embedded-postgres.version>1.3.1</embedded-postgres.version>
<dbcp.version>2.6.0</dbcp.version>
<dbcp.version>2.9.0</dbcp.version>
<jacoco.version>0.8.6</jacoco.version>
<!-- Overwrite Apache Ivy version to 2.5.1 for remediating CVE-2022-37866-->
<ivy.version>2.5.1</ivy.version>
Expand Down

0 comments on commit 9491abe

Please sign in to comment.