Skip to content

Commit

Permalink
initial commits
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkitCLI committed Jun 5, 2023
1 parent 53cf01d commit 8cdbadd
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 204 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin;

import io.cdap.e2e.utils.PluginPropertyUtils;
import org.junit.Assert;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.TimeZone;

/**
* CloudSQLPostgreSQL client.
*/

public class CloudSqlPostgreSqlClient {
public static Connection getCloudSqlConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.postgresql.Driver");
String database = PluginPropertyUtils.pluginProp("databaseName");
String instanceConnectionName = System.getenv("CLOUDSQL_POSTGRESQL_CONNECTION_NAME");
String username = System.getenv("CLOUDSQL_POSTGRESQL_USERNAME");
String password = System.getenv("CLOUDSQL_POSTGRESQL_PASSWORD");
String jdbcUrl = String.format(PluginPropertyUtils.pluginProp("URL"), database, instanceConnectionName, username, password);
Connection connection = DriverManager.getConnection(jdbcUrl);
return connection;
}

public static int countRecord(String table, String schema) throws SQLException, ClassNotFoundException {
String countQuery = "SELECT COUNT(*) as total FROM " + schema + "." + table;
try (Connection connection = getCloudSqlConnection();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(countQuery)) {
int num = 0;
while (rs.next()) {
num = (rs.getInt(1));
}
return num;
}
}

/**
* Extracts entire data from source and target tables.
* @param sourceTable table at the source side
* @param targetTable table at the sink side
* @return true if the values in source and target side are equal
*/
public static boolean validateRecordValues(String sourceTable, String targetTable, String schema)
throws SQLException, ClassNotFoundException {
String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable;
String getTargetQuery = "SELECT * FROM " + schema + "." + targetTable;
try (Connection connection = getCloudSqlConnection()) {
connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement2 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
ResultSet rsSource = statement1.executeQuery(getSourceQuery);
ResultSet rsTarget = statement2.executeQuery(getTargetQuery);
return compareResultSetData(rsSource, rsTarget);
}
}

/**
* Compares the result Set data in source table and sink table..
* @param rsSource result set of the source table data
* @param rsTarget result set of the target table data
* @return true if rsSource matches rsTarget
*/
public static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarget) throws SQLException {
ResultSetMetaData mdSource = rsSource.getMetaData();
ResultSetMetaData mdTarget = rsTarget.getMetaData();
int columnCountSource = mdSource.getColumnCount();
int columnCountTarget = mdTarget.getColumnCount();
Assert.assertEquals("Number of columns in source and target are not equal",
columnCountSource, columnCountTarget);
while (rsSource.next() && rsTarget.next()) {
int currentColumnCount = 1;
while (currentColumnCount <= columnCountSource) {
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
int columnType = mdSource.getColumnType(currentColumnCount);
String columnName = mdSource.getColumnName(currentColumnCount);
if (columnType == Types.TIMESTAMP) {
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
gc.setGregorianChange(new Date(Long.MIN_VALUE));
Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc);
Timestamp targetTS = rsTarget.getTimestamp(currentColumnCount, gc);
Assert.assertEquals(String.format("Different values found for column : %s", columnName), sourceTS, targetTS);
} else {
String sourceString = rsSource.getString(currentColumnCount);
String targetString = rsTarget.getString(currentColumnCount);
Assert.assertEquals(String.format("Different values found for column : %s", columnName),
sourceString, targetString);
}
currentColumnCount++;
}
}
Assert.assertFalse("Number of rows in Source table is greater than the number of rows in Target table",
rsSource.next());
Assert.assertFalse("Number of rows in Target table is greater than the number of rows in Source table",
rsTarget.next());
return true;
}

public static void createSourceTable(String sourceTable, String schema) throws SQLException, ClassNotFoundException {
try (Connection connection = getCloudSqlConnection();
Statement statement = connection.createStatement()) {
String datatypesColumns = PluginPropertyUtils.pluginProp("datatypesColumns");
String createSourceTableQuery = "CREATE TABLE " + schema + "." + sourceTable + datatypesColumns;
statement.executeUpdate(createSourceTableQuery);
System.out.println(createSourceTableQuery);

// Insert dummy data.
String datatypesValues = PluginPropertyUtils.pluginProp("datatypesValues");
String datatypesColumnsList = PluginPropertyUtils.pluginProp("datatypesColumnsList");
statement.executeUpdate("INSERT INTO " + schema + "." + sourceTable + " " + datatypesColumnsList + " " +
datatypesValues);
}
}

public static void createTargetTable(String targetTable, String schema) throws SQLException, ClassNotFoundException {
try (Connection connection = getCloudSqlConnection();
Statement statement = connection.createStatement()) {
String datatypesColumns = PluginPropertyUtils.pluginProp("datatypesColumns");
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable + " " + datatypesColumns;
statement.executeUpdate(createTargetTableQuery);
}
}

public static void createTargetPostgresqlTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connection = getCloudSqlConnection();
Statement statement = connection.createStatement()) {
String datatypesColumns = PluginPropertyUtils.pluginProp("bigQueryDatatypesColumns");
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable + " " + datatypesColumns;
statement.executeUpdate(createTargetTableQuery);
}
}

public static void dropTables(String[] tables, String schema) throws SQLException, ClassNotFoundException {
try (Connection connection = getCloudSqlConnection();
Statement statement = connection.createStatement()) {
for (String table : tables) {
String dropTableQuery = "Drop Table " + schema + "." + table;
statement.executeUpdate(dropTableQuery);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.google.gson.JsonObject;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cdap.plugin.CloudSqlPostgreSqlClient;
import org.apache.spark.sql.types.Decimal;
import org.junit.Assert;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.time.*;
import java.util.Date;
import java.text.ParseException;
Expand All @@ -40,51 +42,52 @@
public class BQValidation {
static List<JsonObject> BigQueryResponse = new ArrayList<>();
static List<Object> bigQueryRows = new ArrayList<>();
static JsonObject json;

/**
* Extracts entire data from source and target tables.
*
* @param sourceTable table at the source side
* @param targetTable table at the sink side
* @return true if the values in source and target side are equal
*/
public static boolean validateDBToBQRecordValues(String schema, String sourceTable, String targetTable)
throws SQLException, ClassNotFoundException, IOException, InterruptedException {
throws SQLException, ClassNotFoundException, IOException, InterruptedException, ParseException {
getBigQueryTableData(targetTable, bigQueryRows);
for (Object rows : bigQueryRows) {
JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class);
json = new Gson().fromJson(String.valueOf(rows), JsonObject.class);
BigQueryResponse.add(json);
}
String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable;
try (Connection connect = CloudSqlPostgreSqlClient.getCloudSqlConnection()) {
connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
try (Connection connection = CloudSqlPostgreSqlClient.getCloudSqlConnection()) {
connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);

ResultSet rsSource = statement1.executeQuery(getSourceQuery);
return compareResultSetandJsonData(rsSource, BigQueryResponse);
}
}

public static boolean validateBQToDBRecordValues(String schema, String sourceTable, String targetTable)
throws SQLException, ClassNotFoundException, IOException, InterruptedException {
throws SQLException, ClassNotFoundException, IOException, InterruptedException, ParseException {
getBigQueryTableData(sourceTable, bigQueryRows);
for (Object rows : bigQueryRows) {
JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class);
json = new Gson().fromJson(String.valueOf(rows), JsonObject.class);
BigQueryResponse.add(json);
}
String getTargetQuery = "SELECT * FROM " + schema + "." + targetTable;
try (Connection connect = CloudSqlPostgreSqlClient.getCloudSqlConnection()) {
connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
try (Connection connection = CloudSqlPostgreSqlClient.getCloudSqlConnection()) {
connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);

ResultSet rsTarget = statement1.executeQuery(getTargetQuery);
return compareResultSetandJsonData(rsTarget, BigQueryResponse);
}
}

/**
* Retrieves the data from a specified BigQuery table and populates it into the provided list of objects.
*
* @param table The name of the BigQuery table to fetch data from.
* @param bigQueryRows The list to store the fetched BigQuery data.
*/
Expand All @@ -100,15 +103,14 @@ private static void getBigQueryTableData(String table, List<Object> bigQueryRows

/**
* Compares the data in the result set obtained from the Oracle database with the provided BigQuery JSON objects.
*
* @param rsSource The result set obtained from the Oracle database.
* @param bigQueryData The list of BigQuery JSON objects to compare with the result set data.
* @return True if the result set data matches the BigQuery data, false otherwise.
* @throws SQLException If an SQL error occurs during the result set operations.
* @throws ParseException If an error occurs while parsing the data.
*/
public static boolean compareResultSetandJsonData(ResultSet rsSource, List<JsonObject> bigQueryData)
throws SQLException {
throws SQLException, ParseException {
ResultSetMetaData mdSource = rsSource.getMetaData();
boolean result = false;
int columnCountSource = mdSource.getColumnCount();
Expand Down Expand Up @@ -161,6 +163,11 @@ public static boolean compareResultSetandJsonData(ResultSet rsSource, List<JsonO
break;

case Types.TIMESTAMP:
Timestamp sourceTS = rsSource.getTimestamp(columnName);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
Date parsedDate = dateFormat.parse(bigQueryData.get(jsonObjectIdx).get(columnName).getAsString());
Timestamp targetTs = new Timestamp(parsedDate.getTime());
Assert.assertEquals(sourceTS, targetTs);
break;

case Types.TIME:
Expand Down
Loading

0 comments on commit 8cdbadd

Please sign in to comment.