From 8cdbaddfd54c1edf641db691f3bdcf7b20429933 Mon Sep 17 00:00:00 2001 From: AnkitCLI Date: Mon, 5 Jun 2023 15:20:10 +0530 Subject: [PATCH] initial commits --- .../cdap/plugin/CloudSqlPostgreSqlClient.java | 170 +++++++++++++++++ .../cloudsqlpostgresql/BQValidation.java | 35 ++-- .../CloudSqlPostgreSqlClient.java | 174 ------------------ .../cloudsqlpostgresql/package-info.java | 20 ++ .../stepsdesign/CloudSqlPostgreSql.java | 4 +- .../stepsdesign/package-info.java | 21 +++ .../common/stepsdesign/TestSetUpHooks.java | 2 +- .../java/io/cdap/plugin/package-info.java | 20 ++ .../resources/pluginParameters.properties | 17 +- 9 files changed, 259 insertions(+), 204 deletions(-) create mode 100644 cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/CloudSqlPostgreSqlClient.java delete mode 100644 cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/CloudSqlPostgreSqlClient.java create mode 100644 cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/package-info.java create mode 100644 cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/stepsdesign/package-info.java create mode 100644 cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/package-info.java diff --git a/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/CloudSqlPostgreSqlClient.java b/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/CloudSqlPostgreSqlClient.java new file mode 100644 index 000000000..5389474de --- /dev/null +++ b/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/CloudSqlPostgreSqlClient.java @@ -0,0 +1,170 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin; + +import io.cdap.e2e.utils.PluginPropertyUtils; +import org.junit.Assert; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.TimeZone; + +/** + * CloudSQLPostgreSQL client. + */ + +public class CloudSqlPostgreSqlClient { + public static Connection getCloudSqlConnection() throws ClassNotFoundException, SQLException { + Class.forName("org.postgresql.Driver"); + String database = PluginPropertyUtils.pluginProp("databaseName"); + String instanceConnectionName = System.getenv("CLOUDSQL_POSTGRESQL_CONNECTION_NAME"); + String username = System.getenv("CLOUDSQL_POSTGRESQL_USERNAME"); + String password = System.getenv("CLOUDSQL_POSTGRESQL_PASSWORD"); + String jdbcUrl = String.format(PluginPropertyUtils.pluginProp("URL"), database, instanceConnectionName, username, password); + Connection connection = DriverManager.getConnection(jdbcUrl); + return connection; + } + + public static int countRecord(String table, String schema) throws SQLException, ClassNotFoundException { + String countQuery = "SELECT COUNT(*) as total FROM " + schema + "." + table; + try (Connection connection = getCloudSqlConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery(countQuery)) { + int num = 0; + while (rs.next()) { + num = (rs.getInt(1)); + } + return num; + } + } + + /** + * Extracts entire data from source and target tables. + * @param sourceTable table at the source side + * @param targetTable table at the sink side + * @return true if the values in source and target side are equal + */ + public static boolean validateRecordValues(String sourceTable, String targetTable, String schema) + throws SQLException, ClassNotFoundException { + String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable; + String getTargetQuery = "SELECT * FROM " + schema + "." + targetTable; + try (Connection connection = getCloudSqlConnection()) { + connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); + Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, + ResultSet.HOLD_CURSORS_OVER_COMMIT); + Statement statement2 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, + ResultSet.HOLD_CURSORS_OVER_COMMIT); + ResultSet rsSource = statement1.executeQuery(getSourceQuery); + ResultSet rsTarget = statement2.executeQuery(getTargetQuery); + return compareResultSetData(rsSource, rsTarget); + } + } + + /** + * Compares the result Set data in source table and sink table.. + * @param rsSource result set of the source table data + * @param rsTarget result set of the target table data + * @return true if rsSource matches rsTarget + */ + public static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarget) throws SQLException { + ResultSetMetaData mdSource = rsSource.getMetaData(); + ResultSetMetaData mdTarget = rsTarget.getMetaData(); + int columnCountSource = mdSource.getColumnCount(); + int columnCountTarget = mdTarget.getColumnCount(); + Assert.assertEquals("Number of columns in source and target are not equal", + columnCountSource, columnCountTarget); + while (rsSource.next() && rsTarget.next()) { + int currentColumnCount = 1; + while (currentColumnCount <= columnCountSource) { + String columnTypeName = mdSource.getColumnTypeName(currentColumnCount); + int columnType = mdSource.getColumnType(currentColumnCount); + String columnName = mdSource.getColumnName(currentColumnCount); + if (columnType == Types.TIMESTAMP) { + GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC")); + gc.setGregorianChange(new Date(Long.MIN_VALUE)); + Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc); + Timestamp targetTS = rsTarget.getTimestamp(currentColumnCount, gc); + Assert.assertEquals(String.format("Different values found for column : %s", columnName), sourceTS, targetTS); + } else { + String sourceString = rsSource.getString(currentColumnCount); + String targetString = rsTarget.getString(currentColumnCount); + Assert.assertEquals(String.format("Different values found for column : %s", columnName), + sourceString, targetString); + } + currentColumnCount++; + } + } + Assert.assertFalse("Number of rows in Source table is greater than the number of rows in Target table", + rsSource.next()); + Assert.assertFalse("Number of rows in Target table is greater than the number of rows in Source table", + rsTarget.next()); + return true; + } + + public static void createSourceTable(String sourceTable, String schema) throws SQLException, ClassNotFoundException { + try (Connection connection = getCloudSqlConnection(); + Statement statement = connection.createStatement()) { + String datatypesColumns = PluginPropertyUtils.pluginProp("datatypesColumns"); + String createSourceTableQuery = "CREATE TABLE " + schema + "." + sourceTable + datatypesColumns; + statement.executeUpdate(createSourceTableQuery); + System.out.println(createSourceTableQuery); + + // Insert dummy data. + String datatypesValues = PluginPropertyUtils.pluginProp("datatypesValues"); + String datatypesColumnsList = PluginPropertyUtils.pluginProp("datatypesColumnsList"); + statement.executeUpdate("INSERT INTO " + schema + "." + sourceTable + " " + datatypesColumnsList + " " + + datatypesValues); + } + } + + public static void createTargetTable(String targetTable, String schema) throws SQLException, ClassNotFoundException { + try (Connection connection = getCloudSqlConnection(); + Statement statement = connection.createStatement()) { + String datatypesColumns = PluginPropertyUtils.pluginProp("datatypesColumns"); + String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable + " " + datatypesColumns; + statement.executeUpdate(createTargetTableQuery); + } + } + + public static void createTargetPostgresqlTable(String targetTable, String schema) throws SQLException, + ClassNotFoundException { + try (Connection connection = getCloudSqlConnection(); + Statement statement = connection.createStatement()) { + String datatypesColumns = PluginPropertyUtils.pluginProp("bigQueryDatatypesColumns"); + String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable + " " + datatypesColumns; + statement.executeUpdate(createTargetTableQuery); + } + } + + public static void dropTables(String[] tables, String schema) throws SQLException, ClassNotFoundException { + try (Connection connection = getCloudSqlConnection(); + Statement statement = connection.createStatement()) { + for (String table : tables) { + String dropTableQuery = "Drop Table " + schema + "." + table; + statement.executeUpdate(dropTableQuery); + } + } + } +} diff --git a/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/BQValidation.java b/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/BQValidation.java index 86214b9bc..fd6069a1b 100644 --- a/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/BQValidation.java +++ b/cloudsql-postgresql-plugin/src/e2e-test/java/io/cdap/plugin/cloudsqlpostgresql/BQValidation.java @@ -21,6 +21,7 @@ import com.google.gson.JsonObject; import io.cdap.e2e.utils.BigQueryClient; import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.CloudSqlPostgreSqlClient; import org.apache.spark.sql.types.Decimal; import org.junit.Assert; @@ -28,6 +29,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.sql.*; +import java.text.SimpleDateFormat; import java.time.*; import java.util.Date; import java.text.ParseException; @@ -40,51 +42,52 @@ public class BQValidation { static List BigQueryResponse = new ArrayList<>(); static List bigQueryRows = new ArrayList<>(); + static JsonObject json; /** * Extracts entire data from source and target tables. - * * @param sourceTable table at the source side * @param targetTable table at the sink side * @return true if the values in source and target side are equal */ public static boolean validateDBToBQRecordValues(String schema, String sourceTable, String targetTable) - throws SQLException, ClassNotFoundException, IOException, InterruptedException { + throws SQLException, ClassNotFoundException, IOException, InterruptedException, ParseException { getBigQueryTableData(targetTable, bigQueryRows); for (Object rows : bigQueryRows) { - JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class); + json = new Gson().fromJson(String.valueOf(rows), JsonObject.class); BigQueryResponse.add(json); } String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable; - try (Connection connect = CloudSqlPostgreSqlClient.getCloudSqlConnection()) { - connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); - Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, + try (Connection connection = CloudSqlPostgreSqlClient.getCloudSqlConnection()) { + connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); + Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, ResultSet.HOLD_CURSORS_OVER_COMMIT); ResultSet rsSource = statement1.executeQuery(getSourceQuery); return compareResultSetandJsonData(rsSource, BigQueryResponse); } } + public static boolean validateBQToDBRecordValues(String schema, String sourceTable, String targetTable) - throws SQLException, ClassNotFoundException, IOException, InterruptedException { + throws SQLException, ClassNotFoundException, IOException, InterruptedException, ParseException { getBigQueryTableData(sourceTable, bigQueryRows); for (Object rows : bigQueryRows) { - JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class); + json = new Gson().fromJson(String.valueOf(rows), JsonObject.class); BigQueryResponse.add(json); } String getTargetQuery = "SELECT * FROM " + schema + "." + targetTable; - try (Connection connect = CloudSqlPostgreSqlClient.getCloudSqlConnection()) { - connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); - Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, + try (Connection connection = CloudSqlPostgreSqlClient.getCloudSqlConnection()) { + connection.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); + Statement statement1 = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, ResultSet.HOLD_CURSORS_OVER_COMMIT); ResultSet rsTarget = statement1.executeQuery(getTargetQuery); return compareResultSetandJsonData(rsTarget, BigQueryResponse); } } + /** * Retrieves the data from a specified BigQuery table and populates it into the provided list of objects. - * * @param table The name of the BigQuery table to fetch data from. * @param bigQueryRows The list to store the fetched BigQuery data. */ @@ -100,7 +103,6 @@ private static void getBigQueryTableData(String table, List bigQueryRows /** * Compares the data in the result set obtained from the Oracle database with the provided BigQuery JSON objects. - * * @param rsSource The result set obtained from the Oracle database. * @param bigQueryData The list of BigQuery JSON objects to compare with the result set data. * @return True if the result set data matches the BigQuery data, false otherwise. @@ -108,7 +110,7 @@ private static void getBigQueryTableData(String table, List bigQueryRows * @throws ParseException If an error occurs while parsing the data. */ public static boolean compareResultSetandJsonData(ResultSet rsSource, List bigQueryData) - throws SQLException { + throws SQLException, ParseException { ResultSetMetaData mdSource = rsSource.getMetaData(); boolean result = false; int columnCountSource = mdSource.getColumnCount(); @@ -161,6 +163,11 @@ public static boolean compareResultSetandJsonData(ResultSet rsSource, List