From 7ab9895fc9d506e086709426d6d57ed3aecb1a9e Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Wed, 26 Jul 2023 17:48:46 +0530 Subject: [PATCH] Update SQL queries for reading and comming migration (cherry picked from commit 5af1bc4a55b644881a1dac2fcf41053386140146) --- .../service/jdbi3/CollectionDAO.java | 10 +- .../openmetadata/service/jdbi3/EntityDAO.java | 9 ++ .../service/migration/MigrationUtil.java | 103 ++++++++---------- 3 files changed, 60 insertions(+), 62 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 4d7c6912f778..5205f0c163ce 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1256,9 +1256,10 @@ List> listToByPrefix( int listDistinctCount(); @Deprecated - @SqlQuery("SELECT DISTINCT fromFQN, toFQN FROM field_relationship LIMIT :limit OFFSET :offset") + @SqlQuery( + "SELECT DISTINCT fromFQN, toFQN FROM field_relationship WHERE fromFQNHash = '' or fromFQNHash is null or toFQNHash = '' or toFQNHash is null LIMIT :limit") @RegisterRowMapper(FieldRelationShipMapper.class) - List> listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); + List> migrationListDistinctWithOffset(@Bind("limit") int limit); @SqlQuery( "SELECT fromFQN, toFQN, json FROM field_relationship WHERE " @@ -3355,9 +3356,10 @@ public ReportDataRow map(ResultSet rs, StatementContext ctx) throws SQLException } } - @SqlQuery("SELECT DISTINCT entityFQN FROM entity_extension_time_series LIMIT :limit OFFSET :offset") + @SqlQuery( + "SELECT DISTINCT entityFQN FROM entity_extension_time_series WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit") @Deprecated - List listDistinctWithOffset(@Bind("limit") int limit, @Bind("offset") int offset); + List migrationListDistinctWithOffset(@Bind("limit") int limit); } class EntitiesCountRowMapper implements RowMapper { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java index f56b4a0b873f..066c17036e5d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityDAO.java @@ -227,6 +227,10 @@ List listAfter( @SqlQuery("SELECT json FROM LIMIT :limit OFFSET :offset") List listAfterWithOffset(@Define("table") String table, @Bind("limit") int limit, @Bind("offset") int offset); + @SqlQuery("SELECT json FROM
WHERE = '' or is null LIMIT :limit") + List migrationListAfterWithOffset( + @Define("table") String table, @Define("nameHashColumn") String nameHashColumnName, @Bind("limit") int limit); + @SqlQuery("SELECT json FROM
AND " + "ORDER BY " + "LIMIT :limit " + "OFFSET :offset") List listAfter( @Define("table") String table, @@ -358,6 +362,11 @@ default List listAfterWithOffset(int limit, int offset) { return listAfterWithOffset(getTableName(), limit, offset); } + default List migrationListAfterWithOffset(int limit) { + // No ordering + return migrationListAfterWithOffset(getTableName(), getNameHashColumn(), limit); + } + default List listAfter(ListFilter filter, int limit, int offset) { return listAfter(getTableName(), getNameHashColumn(), filter.getCondition(), limit, offset); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java index 56cbac4cacf2..4abc06b184f1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/MigrationUtil.java @@ -147,13 +147,14 @@ public static void readAndProcessEntity( Handle handle, String updateSql, Class clazz, EntityDAO dao, boolean withName, int limitParam) throws IOException { LOG.debug("Starting Migration for table : {}", dao.getTableName()); - int offset = 0; - int totalCount = dao.listTotalCount(); - while (offset < totalCount) { + while (true) { PreparedBatch upsertBatch = handle.prepareBatch(updateSql); // Read from Database - List jsons = dao.listAfterWithOffset(limitParam, offset); - offset = offset + limitParam; + List jsons = dao.migrationListAfterWithOffset(limitParam); + LOG.debug("[{}]Read a Batch of Size: {}", dao.getTableName(), jsons.size()); + if (jsons.isEmpty()) { + break; + } // Process Update for (String json : jsons) { // Update the Statements to Database @@ -168,6 +169,7 @@ public static void readAndProcessEntity( LOG.error("Failed in creating FQN Hash for Entity Name : {}", entity.getFullyQualifiedName(), ex); } } + LOG.debug("[{}]Committing a Batch of Size: {}", dao.getTableName(), jsons.size()); upsertBatch.execute(); } LOG.debug("End Migration for table : {}", dao.getTableName()); @@ -276,37 +278,31 @@ public static void dataMigrationFQNHashing(Handle handle, CollectionDAO collecti private static void updateFQNHashForFieldRelationship( Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) { LOG.debug("Starting Migration for Field Relationship"); - int offset = 0; - int totalCount; - try { - // This might result into exceptions if the column entityFQN is dropped once - totalCount = collectionDAO.fieldRelationshipDAO().listDistinctCount(); - } catch (Exception ex) { - return; - } - if (totalCount > 0) { - while (offset < totalCount) { - PreparedBatch upsertBatch = handle.prepareBatch(updateSql); - List> entityFQNPairList = - collectionDAO.fieldRelationshipDAO().listDistinctWithOffset(limitParam, offset); - for (Pair entityFQNPair : entityFQNPairList) { - try { - String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft()); - String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight()); - upsertBatch - .bind("fromFQNHash", fromFQNHash) - .bind("toFQNHash", toFQNHash) - .bind("fromFQN", entityFQNPair.getLeft()) - .bind("toFQN", entityFQNPair.getRight()) - .add(); - } catch (Exception ex) { - LOG.error( - "Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex); - } + while (true) { + PreparedBatch upsertBatch = handle.prepareBatch(updateSql); + List> entityFQNPairList = + collectionDAO.fieldRelationshipDAO().migrationListDistinctWithOffset(limitParam); + LOG.debug("[FieldRelationship] Read a Batch of Size: {}", entityFQNPairList.size()); + if (entityFQNPairList.isEmpty()) { + break; + } + for (Pair entityFQNPair : entityFQNPairList) { + try { + String fromFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getLeft()); + String toFQNHash = FullyQualifiedName.buildHash(entityFQNPair.getRight()); + upsertBatch + .bind("fromFQNHash", fromFQNHash) + .bind("toFQNHash", toFQNHash) + .bind("fromFQN", entityFQNPair.getLeft()) + .bind("toFQN", entityFQNPair.getRight()) + .add(); + } catch (Exception ex) { + LOG.error( + "Failed in creating fromFQN : {} , toFQN : {}", entityFQNPair.getLeft(), entityFQNPair.getRight(), ex); } - upsertBatch.execute(); - offset = offset + limitParam; } + LOG.debug("[FieldRelationship] Committing a Batch of Size: {}", entityFQNPairList.size()); + upsertBatch.execute(); } LOG.debug("End Migration for Field Relationship"); } @@ -314,32 +310,23 @@ private static void updateFQNHashForFieldRelationship( private static void updateFQNHashEntityExtensionTimeSeries( Handle handle, String updateSql, CollectionDAO collectionDAO, int limitParam) { LOG.debug("Starting Migration for Entity Extension Time Series"); - int offset = 0; - int totalCount; - try { - // This might result into exceptions if the column entityFQN is dropped once - totalCount = collectionDAO.entityExtensionTimeSeriesDao().listDistinctCount(); - } catch (Exception ex) { - return; - } - if (totalCount > 0) { - while (offset < totalCount) { - PreparedBatch upsertBatch = handle.prepareBatch(updateSql); - List entityFQNLists = - collectionDAO.entityExtensionTimeSeriesDao().listDistinctWithOffset(limitParam, offset); - for (String entityFQN : entityFQNLists) { - try { - upsertBatch - .bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN)) - .bind("entityFQN", entityFQN) - .add(); - } catch (Exception ex) { - LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex); - } + while (true) { + PreparedBatch upsertBatch = handle.prepareBatch(updateSql); + List entityFQNLists = + collectionDAO.entityExtensionTimeSeriesDao().migrationListDistinctWithOffset(limitParam); + LOG.debug("[TimeSeries] Read a Batch of Size: {}", entityFQNLists.size()); + if (entityFQNLists.isEmpty()) { + break; + } + for (String entityFQN : entityFQNLists) { + try { + upsertBatch.bind("entityFQNHash", FullyQualifiedName.buildHash(entityFQN)).bind("entityFQN", entityFQN).add(); + } catch (Exception ex) { + LOG.error("Failed in creating EntityFQN : {}", entityFQN, ex); } - upsertBatch.execute(); - offset = offset + limitParam; } + LOG.debug("[TimeSeries] Committing a Batch of Size: {}", entityFQNLists.size()); + upsertBatch.execute(); } LOG.debug("Ended Migration for Entity Extension Time Series"); }