diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 3a51bbad..80c3e3ea 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -4,7 +4,7 @@ import re import uuid -from sqlalchemy import exists, and_ +import sqlalchemy as sa from sqlalchemy.orm import contains_eager from ckantoolkit import config @@ -344,7 +344,9 @@ def _create_or_update_package(self, package_dict, harvest_object, # plugin) harvest_object.add() - model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + model.Session.execute( + sa.text('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + ) model.Session.flush() new_package = p.toolkit.get_action( @@ -400,12 +402,12 @@ def last_error_free_job(cls, harvest_job): .filter(HarvestJob.status == 'Finished') .filter(HarvestJob.id != harvest_job.id) .filter( - ~exists().where( + ~sa.exists().where( HarvestGatherError.harvest_job_id == HarvestJob.id)) .outerjoin(HarvestObject, - and_(HarvestObject.harvest_job_id == HarvestJob.id, - HarvestObject.current == False, # noqa: E712 - HarvestObject.report_status != 'not modified')) + sa.and_(HarvestObject.harvest_job_id == HarvestJob.id, + HarvestObject.current == False, # noqa: E712 + HarvestObject.report_status != 'not modified')) .options(contains_eager(HarvestJob.objects)) .order_by(HarvestJob.gather_started.desc())) # now check them until we find one with no fetch/import errors diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index f97a61d9..aad3ddce 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -7,8 +7,8 @@ import logging import datetime +import sqlalchemy as sa from ckantoolkit import config -from sqlalchemy import and_, or_ from urllib.parse import urljoin from ckan.lib.search.index import PackageSearchIndex @@ -194,7 +194,8 @@ def harvest_source_clear(context, data_dict): sql += """ COMMIT; """ - model.Session.execute(sql) + + model.Session.execute(sa.text(sql)) # Refresh the index for this source to update the status object get_action("harvest_source_reindex")(context, {"id": harvest_source_id}) @@ -376,7 +377,7 @@ def harvest_source_job_history_clear(context, data_dict): COMMIT; '''.format(harvest_source_id=harvest_source_id) - model.Session.execute(sql) + model.Session.execute(sa.text(sql)) # Refresh the index for this source to update the status object get_action('harvest_source_reindex')(context, {'id': harvest_source_id}) @@ -497,8 +498,8 @@ def harvest_objects_import(context, data_dict): .join(Package) .filter(HarvestObject.current == True) # noqa: E712 .filter(Package.state == u'active') - .filter(or_(Package.id == package_id_or_name, - Package.name == package_id_or_name))) + .filter(sa.or_(Package.id == package_id_or_name, + Package.name == package_id_or_name))) join_datasets = False else: last_objects_ids = \ @@ -639,8 +640,8 @@ def harvest_jobs_run(context, data_dict): num_objects_in_progress = \ session.query(HarvestObject.id) \ .filter(HarvestObject.harvest_job_id == job['id']) \ - .filter(and_((HarvestObject.state != u'COMPLETE'), - (HarvestObject.state != u'ERROR'))) \ + .filter(sa.and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ .count() if num_objects_in_progress == 0: @@ -947,7 +948,9 @@ def harvest_source_reindex(context, data_dict): 'validate': False, }) package_dict = logic.get_action('harvest_source_show')( - context, {'id': harvest_source_id}) + dict(context, validate=False, use_cache=False), + {'id': harvest_source_id}, + ) log.debug('Updating search index for harvest source: %s', package_dict.get('name') or harvest_source_id) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index c347cb48..aa582275 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -360,7 +360,6 @@ def harvest_object_before_insert_listener(mapper, connection, target): if not target.harvest_source_id or not target.source: if not target.job: raise Exception("You must define a Harvest Job for each Harvest Object") - target.source = target.job.source target.harvest_source_id = target.job.source.id