Skip to content

Commit

Permalink
Update SQL queries for reading and comming migration
Browse files Browse the repository at this point in the history
(cherry picked from commit 5af1bc4)
  • Loading branch information
mohityadav766 committed Jul 26, 2023
1 parent 10209d9 commit 7ab9895
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1256,9 +1256,10 @@ List<Triple<String, String, String>> listToByPrefix(
int listDistinctCount();

@Deprecated
@SqlQuery("SELECT DISTINCT fromFQN, toFQN FROM field_relationship LIMIT :limit OFFSET :offset")
@SqlQuery(
"SELECT DISTINCT fromFQN, toFQN FROM field_relationship WHERE fromFQNHash = '' or fromFQNHash is null or toFQNHash = '' or toFQNHash is null LIMIT :limit")
@RegisterRowMapper(FieldRelationShipMapper.class)
List<Pair<String, String>> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
List<Pair<String, String>> migrationListDistinctWithOffset(@Bind("limit") int limit);

@SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
Expand Down Expand Up @@ -3355,9 +3356,10 @@ public ReportDataRow map(ResultSet rs, StatementContext ctx) throws SQLException
}
}

@SqlQuery("SELECT DISTINCT entityFQN FROM entity_extension_time_series LIMIT :limit OFFSET :offset")
@SqlQuery(
"SELECT DISTINCT entityFQN FROM entity_extension_time_series WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit")
@Deprecated
List<String> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset);
List<String> migrationListDistinctWithOffset(@Bind("limit") int limit);
}

class EntitiesCountRowMapper implements RowMapper<EntitiesCount> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ List<String> listAfter(
@SqlQuery("SELECT json FROM <table> LIMIT :limit OFFSET :offset")
List<String> listAfterWithOffset(@Define("table") String table, @Bind("limit") int limit, @Bind("offset") int offset);

@SqlQuery("SELECT json FROM <table> WHERE <nameHashColumn> = '' or <nameHashColumn> is null LIMIT :limit")
List<String> migrationListAfterWithOffset(
@Define("table") String table, @Define("nameHashColumn") String nameHashColumnName, @Bind("limit") int limit);

@SqlQuery("SELECT json FROM <table> <cond> AND " + "ORDER BY <nameColumn> " + "LIMIT :limit " + "OFFSET :offset")
List<String> listAfter(
@Define("table") String table,
Expand Down Expand Up @@ -358,6 +362,11 @@ default List<String> listAfterWithOffset(int limit, int offset) {
return listAfterWithOffset(getTableName(), limit, offset);
}

default List<String> migrationListAfterWithOffset(int limit) {
// No ordering
return migrationListAfterWithOffset(getTableName(), getNameHashColumn(), limit);
}

default List<String> listAfter(ListFilter filter, int limit, int offset) {
return listAfter(getTableName(), getNameHashColumn(), filter.getCondition(), limit, offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ public static <T extends EntityInterface> void readAndProcessEntity(
Handle handle, String updateSql, Class<T> clazz, EntityDAO<T> dao, boolean withName, int limitParam)
throws IOException {
LOG.debug("Starting Migration for table : {}", dao.getTableName());
int offset = 0;
int totalCount = dao.listTotalCount();
while (offset < totalCount) {
while (true) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
// Read from Database
List<String> jsons = dao.listAfterWithOffset(limitParam, offset);
offset = offset + limitParam;
List<String> jsons = dao.migrationListAfterWithOffset(limitParam);
LOG.debug("[{}]Read a Batch of Size: {}", dao.getTableName(), jsons.size());
if (jsons.isEmpty()) {
break;
}
// Process Update
for (String json : jsons) {
// Update the Statements to Database
Expand All @@ -168,6 +169,7 @@ public static <T extends EntityInterface> void readAndProcessEntity(
LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex);
}
}
LOG.debug("[{}]Committing a Batch of Size: {}", dao.getTableName(), jsons.size());
upsertBatch.execute();
}
LOG.debug("End Migration for table : {}", dao.getTableName());
Expand Down Expand Up @@ -276,70 +278,55 @@ public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collecti
private static void updateFQNHashForFieldRelationship(
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
LOG.debug("Starting Migration for Field Relationship");
int offset = 0;
int totalCount;
try {
// This might result into exceptions if the column entityFQN is dropped once
totalCount = collectionDAO.fieldRelationshipDAO().listDistinctCount();
} catch (Exception ex) {
return;
}
if (totalCount > 0) {
while (offset < totalCount) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<Pair<String, String>> entityFQNPairList =
collectionDAO.fieldRelationshipDAO().listDistinctWithOffset(limitParam, offset);
for (Pair<String, String> entityFQNPair : entityFQNPairList) {
try {
String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft());
String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight());
upsertBatch
.bind("fromFQNHash", fromFQNHash)
.bind("toFQNHash", toFQNHash)
.bind("fromFQN", entityFQNPair.getLeft())
.bind("toFQN", entityFQNPair.getRight())
.add();
} catch (Exception ex) {
LOG.error(
"Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex);
}
while (true) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<Pair<String, String>> entityFQNPairList =
collectionDAO.fieldRelationshipDAO().migrationListDistinctWithOffset(limitParam);
LOG.debug("[FieldRelationship] Read a Batch of Size: {}", entityFQNPairList.size());
if (entityFQNPairList.isEmpty()) {
break;
}
for (Pair<String, String> entityFQNPair : entityFQNPairList) {
try {
String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft());
String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight());
upsertBatch
.bind("fromFQNHash", fromFQNHash)
.bind("toFQNHash", toFQNHash)
.bind("fromFQN", entityFQNPair.getLeft())
.bind("toFQN", entityFQNPair.getRight())
.add();
} catch (Exception ex) {
LOG.error(
"Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex);
}
upsertBatch.execute();
offset = offset + limitParam;
}
LOG.debug("[FieldRelationship] Committing a Batch of Size: {}", entityFQNPairList.size());
upsertBatch.execute();
}
LOG.debug("End Migration for Field Relationship");
}

private static void updateFQNHashEntityExtensionTimeSeries(
Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) {
LOG.debug("Starting Migration for Entity Extension Time Series");
int offset = 0;
int totalCount;
try {
// This might result into exceptions if the column entityFQN is dropped once
totalCount = collectionDAO.entityExtensionTimeSeriesDao().listDistinctCount();
} catch (Exception ex) {
return;
}
if (totalCount > 0) {
while (offset < totalCount) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<String> entityFQNLists =
collectionDAO.entityExtensionTimeSeriesDao().listDistinctWithOffset(limitParam, offset);
for (String entityFQN : entityFQNLists) {
try {
upsertBatch
.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN))
.bind("entityFQN", entityFQN)
.add();
} catch (Exception ex) {
LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex);
}
while (true) {
PreparedBatch upsertBatch = handle.prepareBatch(updateSql);
List<String> entityFQNLists =
collectionDAO.entityExtensionTimeSeriesDao().migrationListDistinctWithOffset(limitParam);
LOG.debug("[TimeSeries] Read a Batch of Size: {}", entityFQNLists.size());
if (entityFQNLists.isEmpty()) {
break;
}
for (String entityFQN : entityFQNLists) {
try {
upsertBatch.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN)).bind("entityFQN", entityFQN).add();
} catch (Exception ex) {
LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex);
}
upsertBatch.execute();
offset = offset + limitParam;
}
LOG.debug("[TimeSeries] Committing a Batch of Size: {}", entityFQNLists.size());
upsertBatch.execute();
}
LOG.debug("Ended Migration for Entity Extension Time Series");
}
Expand Down

0 comments on commit 7ab9895

Please sign in to comment.