Skip to content

Commit

Permalink
GEN-1493 - Fix paginate_es in opensearch (#17858)
Browse files Browse the repository at this point in the history
* GEN-1493 - Fix opensearch pagination

* GEN-1494 - Add CI for py-tests with Postgres and Opensearch

* GEN-1494 - Add CI for py-tests with Postgres and Opensearch
  • Loading branch information
pmbrull authored Sep 17, 2024
1 parent 49323ed commit 6a1cd0e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/py-tests-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: py-tests
name: py-tests-postgres
on:
workflow_dispatch:
push:
Expand Down
1 change: 0 additions & 1 deletion docker/development/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ services:
retries: 10
volumes:
- es-data:/usr/share/elasticsearch/data


execute-migrate-all:
build:
Expand Down
33 changes: 20 additions & 13 deletions ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,9 @@ def paginate_es(
else:
break

# Get the data
for hit in response.hits.hits:
try:
yield self.get_by_name(
entity=entity,
fqn=hit.source["fullyQualifiedName"],
fields=fields,
nullable=False, # Raise an error if we don't find the Entity
)
except Exception as exc:
logger.warning(
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)
yield from self._yield_hits_from_api(
response=response, entity=entity, fields=fields
)

# Get next page
last_hit = response.hits.hits[-1] if response.hits.hits else None
Expand All @@ -362,3 +352,20 @@ def _get_es_response(self, query_string: str) -> Optional[ESResponse]:
logger.debug(traceback.format_exc())
logger.warning(f"Error while getting ES response: {exc}")
return None

def _yield_hits_from_api(
self, response: ESResponse, entity: Type[T], fields: Optional[List[str]]
) -> Iterator[T]:
"""Get the data from the API based on ES responses"""
for hit in response.hits.hits:
try:
yield self.get_by_name(
entity=entity,
fqn=hit.source["fullyQualifiedName"],
fields=fields,
nullable=False, # Raise an error if we don't find the Entity
)
except Exception as exc:
logger.warning(
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)
2 changes: 0 additions & 2 deletions ingestion/tests/integration/ometa/test_ometa_es_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ def test_get_queries_with_lineage(self):
res = self.metadata.es_get_queries_with_lineage(self.service.name.root)
self.assertIn(self.checksum, res)

@pytest.skip("This never finished with Opensearch", allow_module_level=True)
def test_paginate_no_filter(self):
"""We can paginate all the data"""
# Since the test can run in parallel with other tables being there, we just
Expand All @@ -309,7 +308,6 @@ def test_paginate_no_filter(self):
assert asset
break

@pytest.skip("This never finished with Opensearch", allow_module_level=True)
def test_paginate_with_errors(self):
"""We don't want to stop the ES yields just because a single Entity has an error"""
# 1. First, prepare some tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ public Response search(SearchRequest request, SubjectContext subjectContext) thr
}
}

if (!nullOrEmpty(request.getSearchAfter())) {
searchSourceBuilder.searchAfter(request.getSearchAfter());
}

/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request
.getIndex()
Expand Down

0 comments on commit 6a1cd0e

Please sign in to comment.