Skip to content

Commit

Permalink
Merge pull request #686 from javiertuya/680-db-sql-executor
Browse files Browse the repository at this point in the history
ISSUE-680 # Add the DB SQL Executor to import data from CSV and execute SQL statements
  • Loading branch information
authorjapps authored Dec 22, 2024
2 parents c86bb83 + ce8f2aa commit 9d481ea
Show file tree
Hide file tree
Showing 25 changed files with 1,363 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,8 @@ jobs:
- name: Running Kafka
run: docker-compose -f docker/compose/kafka-schema-registry.yml up -d && sleep 10

- name: Running PostgreSQL (to test DB SQL Executor)
run: docker-compose -f docker/compose/pg_compose.yml up -d

- name: Building and testing the changes
run: mvn clean test
9 changes: 9 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,20 @@
<artifactId>micro-simulator</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<!--<scope>test</scope>--> <!-- Make it available to dependant projects. Hence commented -->
</dependency>
<dependency>
<groupId>com.aventstack</groupId>
<artifactId>extentreports</artifactId>
Expand Down
135 changes: 135 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbCsvLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package org.jsmart.zerocode.core.db;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.univocity.parsers.csv.CsvParser;

/**
* Data loading in the database from a CSV external source
*/
class DbCsvLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DbCsvLoader.class);
private Connection conn;
private CsvParser csvParser;

public DbCsvLoader(Connection conn, CsvParser csvParser) {
this.conn = conn;
this.csvParser = csvParser;
}

/**
* Loads rows in CSV format (csvLines) into a table in the database
* and returns the total number of rows.
*/
public int loadCsv(String table, List<String> csvLines, boolean withHeaders, String nullString) throws SQLException {
if (csvLines == null || csvLines.isEmpty())
return 0;

List<String[]> lines = parseLines(table, csvLines);

String[] headers = buildHeaders(lines.get(0), withHeaders);
List<Object[]> paramset = buildParameters(table, headers, lines, withHeaders, nullString);
if (paramset.isEmpty()) // can have headers, but no rows
return 0;

String sql = buildSql(table, headers, paramset.get(0).length);
LOGGER.info("Loading CSV using this sql: {}", sql);

QueryRunner runner = new QueryRunner();
int insertCount = 0;
for (int i = 0 ; i < paramset.size(); i++) {
insertRow(runner, i, sql, paramset.get(i));
insertCount++;
}
LOGGER.info("Total of rows inserted: {}", insertCount);
return insertCount;
}

private List<String[]> parseLines(String table, List<String> lines) {
int numCol = 0; // will check that every row has same columns than the first
List<String[]> parsedLines = new ArrayList<>();
for (int i = 0; i<lines.size(); i++) {
String[] parsedLine = csvParser.parseLine(lines.get(i));
parsedLines.add(parsedLine);
if (i == 0) {
numCol=parsedLine.length;
} else if (numCol != parsedLine.length) {
String message = String.format("Error parsing CSV content to load into table %s: "
+ "Row %d has %d columns and should have %d", table, i + 1, parsedLine.length, numCol);
LOGGER.error(message);
throw new RuntimeException(message);
}
}
return parsedLines;
}

private String[] buildHeaders(String[] line, boolean withHeaders) {
return withHeaders ? line : new String[] {};
}

private List<Object[]> buildParameters(String table, String[] headers, List<String[]> lines, boolean withHeaders, String nullString) {
DbValueConverter converter = new DbValueConverter(conn, table);
List<Object[]> paramset = new ArrayList<>();
for (int i = withHeaders ? 1 : 0; i < lines.size(); i++) {
String[] parsedLine = lines.get(i);
parsedLine = processNulls(parsedLine, nullString);
Object[] params;
try {
params = converter.convertColumnValues(headers, parsedLine);
LOGGER.info(" row [{}] params: {}", i + 1, Arrays.asList(params).toString());
} catch (Exception e) { // Not only SQLException as converter also does parsing
String message = String.format("Error matching data type of parameters and table columns at CSV row %d", i + 1);
LOGGER.error(message);
LOGGER.error("Exception message: {}", e.getMessage());
throw new RuntimeException(message, e);
}
paramset.add(params);
}
return paramset;
}

private String[] processNulls(String[] line, String nullString) {
for (int i = 0; i < line.length; i++) {
if (StringUtils.isBlank(nullString) && StringUtils.isBlank(line[i])) {
line[i] = null;
} else if (!StringUtils.isBlank(nullString)) {
if (StringUtils.isBlank(line[i])) // null must be empty string
line[i] = "";
else if (nullString.trim().equalsIgnoreCase(line[i].trim()))
line[i] = null;
}
}
return line;
}

private String buildSql(String table, String[] headers, int columnCount) {
String placeholders = IntStream.range(0, columnCount)
.mapToObj(i -> "?").collect(Collectors.joining(","));
return "INSERT INTO " + table
+ (headers.length > 0 ? " (" + String.join(",", headers) + ")" : "")
+ " VALUES (" + placeholders + ");";
}

private void insertRow(QueryRunner runner, int rowId, String sql, Object[] params) {
try {
runner.update(conn, sql, params);
} catch (SQLException e) {
String message = String.format("Error inserting data at CSV row %d", rowId + 1);
LOGGER.error(message);
LOGGER.error("Exception message: {}", e.getMessage());
throw new RuntimeException(message, e);
}
}

}
101 changes: 101 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbCsvRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.jsmart.zerocode.core.db;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class DbCsvRequest {
private final String tableName;
private final List<String> csvSource;
private final Boolean withHeaders;
private final String nullString;

public DbCsvRequest(
@JsonProperty(value="tableName", required=true) String tableName,
@JsonProperty("csvSource") JsonNode csvSourceJsonNode,
@JsonProperty("withHeaders") Boolean withHeaders,
@JsonProperty("nullString") String nullString) {
this.tableName = tableName;
this.withHeaders = Optional.ofNullable(withHeaders).orElse(false);
this.nullString = Optional.ofNullable(nullString).orElse("");
this.csvSource = Optional.ofNullable(csvSourceJsonNode).map(this::getCsvSourceFrom).orElse(Collections.emptyList());
}

public String getTableName() {
return tableName;
}

public List<String> getCsvSource() {
return csvSource;
}

public boolean getWithHeaders() {
return withHeaders;
}

public String getNullString() {
return nullString;
}

// Code below is duplicated from org.jsmart.zerocode.core.domain.Parametrized.java and not included in tests.
// TODO Consider some refactoring later and review error message when file not found

private List<String> getCsvSourceFrom(JsonNode csvSourceJsonNode) {
try {
if (csvSourceJsonNode.isArray()) {
return readCsvSourceFromJson(csvSourceJsonNode);

} else {
return readCsvSourceFromExternalCsvFile(csvSourceJsonNode);
}
} catch (IOException e) {
throw new RuntimeException("Error deserializing csvSource", e);
}
}

private List<String> readCsvSourceFromJson(JsonNode csvSourceJsonNode) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {
});
return reader.readValue(csvSourceJsonNode);
}

private List<String> readCsvSourceFromExternalCsvFile(JsonNode csvSourceJsonNode) throws IOException {
String csvSourceFilePath = csvSourceJsonNode.textValue();
if (StringUtils.isNotBlank(csvSourceFilePath)) {
Path path = Paths.get("./src/test/resources/",csvSourceFilePath);
List<String> csvSourceFileLines = Files.lines(path)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
//if (this.ignoreHeader) {
// return csvSourceFileLines.stream()
// .skip(1)
// .collect(Collectors.toList());
//}
return csvSourceFileLines;
}
return Collections.emptyList();
}

@Override
public String toString() {
return "Parameterized{" +
"tableName=" + tableName +
", csvSource=" + csvSource +
", withHeaders=" + withHeaders +
", nullString=" + nullString +
'}';
}
}
119 changes: 119 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.jsmart.zerocode.core.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.univocity.parsers.csv.CsvParser;

import org.apache.commons.dbutils.DbUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Interaction with a database using SQL to read/write
* Requires the appropriated connection data in the target environment
* properties, see src/test/resources/db_test.properties
*/
public class DbSqlExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlExecutor.class);
public static final String SQL_RESULTS_KEY = "rows";
public static final String CSV_RESULTS_KEY = "size";

// Optional to log the explanatory error message if the env variables are no defined
@Inject(optional = true)
@Named("db.driver.url") private String url;

@Inject(optional = true)
@Named("db.driver.user") private String user;

@Inject(optional = true)
@Named("db.driver.password") private String password;

@Inject
private CsvParser csvParser;

/**
* The LOADCSV operation inserts the content of a CSV file into a table,
* and returns the number of records inserted under the key "size"
*/
public Map<String, Object> LOADCSV(DbCsvRequest request) { // uppercase for consistency with http api operations
return loadcsv(request);
}

public Map<String, Object> loadcsv(DbCsvRequest request) {
Connection conn = createAndGetConnection();
try {
LOGGER.info("Load CSV, request -> {} ", request);
DbCsvLoader runner = new DbCsvLoader(conn, csvParser);
long result = runner.loadCsv(request.getTableName(), request.getCsvSource(),
request.getWithHeaders(), request.getNullString());
Map<String, Object> response = new HashMap<>();
response.put(CSV_RESULTS_KEY, result);
return response;
} catch (Exception e) {
String message = "Failed to load CSV";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
} finally {
closeConnection(conn);
}
}

/**
* The EXECUTE operation returns the records retrieved by the SQL specified in the request
* under the key "rows" (select), or an empty object (insert, update)
*/
public Map<String, Object> EXECUTE(DbSqlRequest request) {
return execute(request);
}

public Map<String, Object> execute(DbSqlRequest request) {
Connection conn = createAndGetConnection();
try {
LOGGER.info("Execute SQL, request -> {} ", request);
DbSqlRunner runner = new DbSqlRunner(conn);
List<Map<String, Object>> results = runner.execute(request.getSql(), request.getSqlParams());
Map<String, Object> response = new HashMap<>();
if (results == null) { // will return empty node, use "verify":{}
response.put(SQL_RESULTS_KEY, new ObjectMapper().createObjectNode());
} else {
response.put(SQL_RESULTS_KEY, results);
}
return response;
} catch (SQLException e) {
String message = "Failed to execute SQL";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
} finally {
closeConnection(conn);
}
}

/**
* Returns a new JDBC connection using DriverManager.
* Override this method in case you get the connections using another approach
* (e.g. DataSource)
*/
protected Connection createAndGetConnection() {
LOGGER.info("Create and get connection, url: {}, user: {}", url, user);
try {
return DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
String message = "Failed to create connection, Please check the target environment properties "
+ "to connect the database (db.driver.url, db.driver.user and db.driver.password)";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
}
}

protected void closeConnection(Connection conn) {
DbUtils.closeQuietly(conn);
}

}
Loading

0 comments on commit 9d481ea

Please sign in to comment.