Skip to content

Commit

Permalink
feat: SQLAlchemy v2 support
Browse files Browse the repository at this point in the history
  • Loading branch information
smotornyuk committed Sep 6, 2024
1 parent bf849f1 commit 2910271
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
10 changes: 6 additions & 4 deletions ckanext/harvest/harvesters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import uuid

from sqlalchemy import exists, and_
import sqlalchemy as sa
from sqlalchemy.orm import contains_eager

from ckantoolkit import config
Expand Down Expand Up @@ -344,7 +344,9 @@ def _create_or_update_package(self, package_dict, harvest_object,
# plugin)
harvest_object.add()

model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
model.Session.execute(
sa.text('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
)
model.Session.flush()

new_package = p.toolkit.get_action(
Expand Down Expand Up @@ -400,10 +402,10 @@ def last_error_free_job(cls, harvest_job):
.filter(HarvestJob.status == 'Finished')
.filter(HarvestJob.id != harvest_job.id)
.filter(
~exists().where(
~sa.exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id))
.outerjoin(HarvestObject,
and_(HarvestObject.harvest_job_id == HarvestJob.id,
sa.and_(HarvestObject.harvest_job_id == HarvestJob.id,
HarvestObject.current == False, # noqa: E712
HarvestObject.report_status != 'not modified'))
.options(contains_eager(HarvestJob.objects))
Expand Down
58 changes: 34 additions & 24 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import logging
import datetime

import sqlalchemy as sa
from ckantoolkit import config
from sqlalchemy import and_, or_
from urllib.parse import urljoin

from ckan.lib.search.index import PackageSearchIndex
Expand Down Expand Up @@ -125,7 +125,7 @@ def harvest_source_clear(context, data_dict):
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}'));'''.format(
harvest_source_id=harvest_source_id)
result = model.Session.execute(sql)
result = model.Session.execute(sa.text(sql))
ids = []
for row in result:
ids.append(row[0])
Expand All @@ -137,14 +137,18 @@ def harvest_source_clear(context, data_dict):
where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id)

if toolkit.check_ckan_version(max_version='2.10.99'):
sql += '''
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
'''

# CKAN-2.3 or above: delete resource views, resource revisions & resources
if toolkit.check_ckan_version(min_version='2.3'):
sql += '''
delete from resource_view where resource_id in (
select id from resource where package_id in (
select id from package where state = 'to_delete'));
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource where package_id in (
select id from package where state = 'to_delete');
'''
Expand Down Expand Up @@ -172,6 +176,24 @@ def harvest_source_clear(context, data_dict):
and context = 'Package';
'''

if toolkit.check_ckan_version(max_version='2.10.99'):
sql += '''
delete from package_tag_revision where package_id in (
select id from package where state = 'to_delete');
delete from member_revision where table_id in (
select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (
select id from package where state = 'to_delete');
delete from package_revision where id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_extra where package_id in (
select id from package where state = 'to_delete');
'''

sql += '''
delete from harvest_object_error where harvest_object_id in (
select id from harvest_object
Expand All @@ -183,22 +205,8 @@ def harvest_source_clear(context, data_dict):
delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_tag_revision where package_id in (
select id from package where state = 'to_delete');
delete from member_revision where table_id in (
select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (
select id from package where state = 'to_delete');
delete from package_revision where id in (
select id from package where state = 'to_delete');
delete from package_tag where package_id in (
select id from package where state = 'to_delete');
delete from package_extra where package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (
Expand Down Expand Up @@ -226,7 +234,7 @@ def harvest_source_clear(context, data_dict):
sql += '''
commit;
'''
model.Session.execute(sql)
model.Session.execute(sa.text(sql))

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
Expand Down Expand Up @@ -408,7 +416,7 @@ def harvest_source_job_history_clear(context, data_dict):
COMMIT;
'''.format(harvest_source_id=harvest_source_id)

model.Session.execute(sql)
model.Session.execute(sa.text(sql))

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
Expand Down Expand Up @@ -529,8 +537,8 @@ def harvest_objects_import(context, data_dict):
.join(Package)
.filter(HarvestObject.current == True) # noqa: E712
.filter(Package.state == u'active')
.filter(or_(Package.id == package_id_or_name,
Package.name == package_id_or_name)))
.filter(sa.or_(Package.id == package_id_or_name,
Package.name == package_id_or_name)))
join_datasets = False
else:
last_objects_ids = \
Expand Down Expand Up @@ -671,7 +679,7 @@ def harvest_jobs_run(context, data_dict):
num_objects_in_progress = \
session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(and_((HarvestObject.state != u'COMPLETE'),
.filter(sa.and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
.count()

Expand Down Expand Up @@ -976,7 +984,9 @@ def harvest_source_reindex(context, data_dict):
del context['extras_as_string']
context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
dict(context, validate=False, use_cache=False),
{'id': harvest_source_id},
)
log.debug('Updating search index for harvest source: %s',
package_dict.get('name') or harvest_source_id)

Expand Down
1 change: 0 additions & 1 deletion ckanext/harvest/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ def harvest_object_before_insert_listener(mapper, connection, target):
if not target.harvest_source_id or not target.source:
if not target.job:
raise Exception("You must define a Harvest Job for each Harvest Object")
target.source = target.job.source
target.harvest_source_id = target.job.source.id


Expand Down

0 comments on commit 2910271

Please sign in to comment.